testERP5SyncML.py 53.8 KB
Newer Older
Sebastien Robin's avatar
Sebastien Robin committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
##############################################################################
#
# Copyright (c) 2004 Nexedi SARL and Contributors. All Rights Reserved.
#          Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################



#
# Skeleton ZopeTestCase
#

from random import randint

import os, sys
if __name__ == '__main__':
    execfile(os.path.join(sys.path[0], 'framework.py'))

# Needed in order to have a log file inside the current folder
os.environ['EVENT_LOG_FILE'] = os.path.join(os.getcwd(), 'zLOG.log')
os.environ['EVENT_LOG_SEVERITY'] = '-300'

from Testing import ZopeTestCase
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
47 48
from DateTime import DateTime
from Products.ERP5.Document.Person import Person
Sebastien Robin's avatar
Sebastien Robin committed
49 50
from AccessControl.SecurityManagement import newSecurityManager, noSecurityManager
from Products.ERP5SyncML.Conduit.ERP5Conduit import ERP5Conduit
51
from Products.ERP5SyncML.SyncCode import SyncCode
Sebastien Robin's avatar
Sebastien Robin committed
52 53 54 55 56 57
from zLOG import LOG
import time

class TestERP5SyncML(ERP5TypeTestCase):

  # Different variables used for this test
58
  run_all_test = 1
59
  workflow_id = 'edit_workflow'
Sebastien Robin's avatar
Sebastien Robin committed
60 61
  first_name1 = 'Sebastien'
  last_name1 = 'Robin'
62
  description1 = 'description1 --- $sdfr_sdfsdf_oisfsopf'
63 64 65 66
  lang1 = 'fr'
  format2 = 'html'
  format3 = 'xml'
  format4 = 'txt'
Sebastien Robin's avatar
Sebastien Robin committed
67 68
  first_name2 = 'Jean-Paul'
  last_name2 = 'Smets'
69
  description2 = 'description2@  $*< <<<  ----- >>>></title>&oekd'
70
  lang2 = 'en'
Sebastien Robin's avatar
Sebastien Robin committed
71 72
  first_name3 = 'Yoshinori'
  last_name3 = 'Okuji'
73
  description3 = 'description3 sdf__sdf_df___&&]]]'
74 75 76
  description4 = 'description4 sdflkmooo^^^^]]]]]{{{{{{{'
  lang3 = 'jp'
  lang4 = 'ca'
Sebastien Robin's avatar
Sebastien Robin committed
77 78 79 80 81 82 83 84 85 86
  xml_mapping = 'asXML'
  id1 = '170'
  id2 = '171'
  pub_id = 'Publication'
  sub_id1 = 'Subscription1'
  sub_id2 = 'Subscription2'
  nb_subscription = 2
  nb_publication = 1
  nb_synchronization = 3
  nb_message_first_synchronization = 6
87 88 89 90 91 92
  subscription_url1 = 'file://tmp/sync_client1'
  subscription_url2 = 'file://tmp/sync_client2'
  publication_url = 'file://tmp/sync_server'
  #publication_url = 'server@localhost'
  #subscription_url1 = 'client1@localhost'
  #subscription_url2 = 'client2@localhost'
Sebastien Robin's avatar
Sebastien Robin committed
93 94 95 96 97

  def getBusinessTemplateList(self):
    """
      Return the list of business templates.

98
      the business template sync_crm give 3 folders:
99
      /person_server 
Sebastien Robin's avatar
Sebastien Robin committed
100 101 102
      /person_client1 : empty
      /person_client2 : empty
    """
103
    return ('sync_crm',)
Sebastien Robin's avatar
Sebastien Robin committed
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119

  def getSynchronizationTool(self):
    return getattr(self.getPortal(), 'portal_synchronizations', None)

  def getPersonClient1(self):
    return getattr(self.getPortal(), 'person_client1', None)

  def getPersonServer(self):
    return getattr(self.getPortal(), 'person_server', None)

  def getPersonClient2(self):
    return getattr(self.getPortal(), 'person_client2', None)

  def getPortalId(self):
    return self.getPortal().getId()

120
  def testHasEverything(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
121
    # Test if portal_synchronizations was created
122
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
123 124 125 126
    if not quiet:
      ZopeTestCase._print('\nTest Has Everything ')
      LOG('Testing... ',0,'testHasEverything')
    self.failUnless(self.getSynchronizationTool()!=None)
127 128 129
    #self.failUnless(self.getPersonServer()!=None)
    #self.failUnless(self.getPersonClient1()!=None)
    #self.failUnless(self.getPersonClient2()!=None)
Sebastien Robin's avatar
Sebastien Robin committed
130

131 132
  def testAddPublication(self, quiet=0, run=run_all_test):
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
133 134 135 136 137
    if not quiet:
      ZopeTestCase._print('\nTest Add a Publication ')
      LOG('Testing... ',0,'testAddPublication')
    portal_id = self.getPortalName()
    portal_sync = self.getSynchronizationTool()
138
    portal_sync.manage_addPublication(self.pub_id,self.publication_url,
Sebastien Robin's avatar
Sebastien Robin committed
139
                                      '/%s/person_server' % portal_id,'',
140
                                      self.xml_mapping,'')
Sebastien Robin's avatar
Sebastien Robin committed
141 142 143
    pub = portal_sync.getPublication(self.pub_id)
    self.failUnless(pub is not None)

144 145
  def testAddSubscription1(self, quiet=0, run=run_all_test):
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
146 147 148 149 150
    if not quiet:
      ZopeTestCase._print('\nTest Add First Subscription ')
      LOG('Testing... ',0,'testAddSubscription1')
    portal_id = self.getPortalId()
    portal_sync = self.getSynchronizationTool()
151
    portal_sync.manage_addSubscription(self.sub_id1,self.publication_url,
152
                          self.subscription_url1,'/%s/person_client1' % portal_id,'',
153
                          self.xml_mapping,'')
Sebastien Robin's avatar
Sebastien Robin committed
154 155 156
    sub = portal_sync.getSubscription(self.sub_id1)
    self.failUnless(sub is not None)

157 158
  def testAddSubscription2(self, quiet=0, run=run_all_test):
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
159 160 161 162 163
    if not quiet:
      ZopeTestCase._print('\nTest Add Second Subscription ')
      LOG('Testing... ',0,'testAddSubscription2')
    portal_id = self.getPortalId()
    portal_sync = self.getSynchronizationTool()
164
    portal_sync.manage_addSubscription(self.sub_id2,self.publication_url,
165
                          self.subscription_url2,'/%s/person_client2' % portal_id,'',
166
                          self.xml_mapping,'')
Sebastien Robin's avatar
Sebastien Robin committed
167 168 169
    sub = portal_sync.getSubscription(self.sub_id2)
    self.failUnless(sub is not None)

170
  def login(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
171
    uf = self.getPortal().acl_users
172 173
    uf._doAddUser('seb', '', ['Manager'], [])
    user = uf.getUserById('seb').__of__(uf)
Sebastien Robin's avatar
Sebastien Robin committed
174 175
    newSecurityManager(None, user)

176
  def populatePersonServer(self, quiet=0, run=run_all_test):
177
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
178 179
    if not quiet:
      ZopeTestCase._print('\nTest Populate Person Server ')
180
      LOG('Testing... ',0,'populatePersonServer')
Sebastien Robin's avatar
Sebastien Robin committed
181
    self.login()
182 183 184 185 186 187 188 189 190 191 192 193 194
    portal = self.getPortal()
    if not hasattr(portal,'person_server'):
      portal.portal_types.constructContent(type_name = 'Person Module',
                                           container = portal,
                                           id = 'person_server')
    if not hasattr(portal,'person_client1'):
      portal.portal_types.constructContent(type_name = 'Person Module',
                                           container = portal,
                                           id = 'person_client1')
    if not hasattr(portal,'person_client2'):
      portal.portal_types.constructContent(type_name = 'Person Module',
                                           container = portal,
                                           id = 'person_client2')
Sebastien Robin's avatar
Sebastien Robin committed
195
    person_id = ''
196
    person_server = self.getPersonServer()
Sebastien Robin's avatar
Sebastien Robin committed
197 198 199 200 201 202 203 204 205 206 207 208
    person1 = person_server.newContent(id=self.id1,portal_type='Person')
    kw = {'first_name':self.first_name1,'last_name':self.last_name1,
          'description':self.description1}
    person1.edit(**kw)
    person2 = person_server.newContent(id=self.id2,portal_type='Person')
    kw = {'first_name':self.first_name2,'last_name':self.last_name2,
          'description':self.description2}
    person2.edit(**kw)
    nb_person = len(person_server.objectValues())
    self.failUnless(nb_person==2)
    return nb_person

209
  def setupPublicationAndSubscription(self, quiet=0, run=run_all_test):
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227
    self.testAddPublication(quiet=1,run=1)
    self.testAddSubscription1(quiet=1,run=1)
    self.testAddSubscription2(quiet=1,run=1)
      
  def setupPublicationAndSubscriptionAndGid(self, quiet=0, run=run_all_test):
    self.setupPublicationAndSubscription(quiet=1,run=1)
    def getGid(object):
      return object.getTitle()
    portal_sync = self.getSynchronizationTool()
    sub1 = portal_sync.getSubscription(self.sub_id1)
    sub2 = portal_sync.getSubscription(self.sub_id2)
    pub = portal_sync.getPublication(self.pub_id)
    pub.setGidGenerator(getGid)
    sub1.setGidGenerator(getGid)
    sub2.setGidGenerator(getGid)
    pub.setIdGenerator('generateNewId')
    sub1.setIdGenerator('generateNewId')
    sub2.setIdGenerator('generateNewId')
Sebastien Robin's avatar
Sebastien Robin committed
228

229
  def testGetSynchronizationList(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
230 231 232
    # This test the getSynchronizationList, ie,
    # We want to see if we retrieve both the subscription
    # and the publication
233
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
234 235 236
    if not quiet:
      ZopeTestCase._print('\nTest getSynchronizationList ')
      LOG('Testing... ',0,'testGetSynchronizationList')
237
    self.setupPublicationAndSubscription(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
238 239 240 241
    portal_sync = self.getSynchronizationTool()
    synchronization_list = portal_sync.getSynchronizationList()
    self.failUnless(len(synchronization_list)==self.nb_synchronization)

242
  def testGetObjectList(self, quiet=0, run=run_all_test):
243 244 245 246 247
    """
    This test the default getObjectList, ie, when the
    query is 'objectValues', and this also test if we enter
    a new method for the query
    """
248
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
249 250 251 252
    if not quiet:
      ZopeTestCase._print('\nTest getObjectList ')
      LOG('Testing... ',0,'testGetObjectList')
    self.login()
253
    self.setupPublicationAndSubscription(quiet=1,run=1)
254
    nb_person = self.populatePersonServer(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
    portal_sync = self.getSynchronizationTool()
    publication_list = portal_sync.getPublicationList()
    publication = publication_list[0]
    object_list = publication.getObjectList()
    self.failUnless(len(object_list)==nb_person)
    # now try to set a different method for query
    def query(object):
      object_list = object.objectValues()
      return_list = []
      for o in object_list:
        if o.getId()==self.id1:
          return_list.append(o)
      return return_list
    publication.setQuery(query)
    object_list = publication.getObjectList()
    self.failUnless(len(object_list)==1)

272
  def testExportImport(self, quiet=0, run=run_all_test):
273 274 275 276
    """
    We will try to export a person with asXML
    And then try to add it to another folder with a conduit
    """
277
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
278 279 280 281
    if not quiet:
      ZopeTestCase._print('\nTest Export and Import ')
      LOG('Testing... ',0,'testExportImport')
    self.login()
282
    self.populatePersonServer(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
283 284 285 286 287 288 289 290 291 292
    person_server = self.getPersonServer()
    person_client1 = self.getPersonClient1()
    person = person_server._getOb(self.id1)
    xml_output = person.asXML()
    conduit = ERP5Conduit()
    conduit.addNode(object=person_client1,xml=xml_output)
    self.failUnless(len(person_client1.objectValues())==1)
    new_object = person_client1._getOb(self.id1)
    self.failUnless(new_object.getLastName()==self.last_name1)
    self.failUnless(new_object.getFirstName()==self.first_name1)
293 294 295 296 297
    # XXX We should also looks at the workflow history
    self.failUnless(len(new_object.workflow_history[self.workflow_id])==2)
    s_local_role = person_server.get_local_roles()
    c_local_role = person_client1.get_local_roles()
    self.assertEqual(s_local_role,c_local_role)
Sebastien Robin's avatar
Sebastien Robin committed
298

299
  def synchronize(self, id, run=run_all_test):
300 301 302 303
    """
    This just define how we synchronize, we have
    to define it here because it is specific to the unit testing
    """
Sebastien Robin's avatar
Sebastien Robin committed
304
    portal_sync = self.getSynchronizationTool()
305
    #portal_sync.email = None # XXX To be removed
Sebastien Robin's avatar
Sebastien Robin committed
306 307 308 309 310 311 312
    subscription = portal_sync.getSubscription(id)
    publication = None
    for publication in portal_sync.getPublicationList():
      if publication.getPublicationUrl()==subscription.getSubscriptionUrl():
        publication = publication
    self.failUnless(publication is not None)
    # reset files, because we do sync by files
313 314 315 316
    file = open('/tmp/sync_client1','w')
    file.write('')
    file.close()
    file = open('/tmp/sync_client2','w')
Sebastien Robin's avatar
Sebastien Robin committed
317 318 319 320 321 322
    file.write('')
    file.close()
    file = open('/tmp/sync','w')
    file.write('')
    file.close()
    nb_message = 1
323
    result = portal_sync.SubSync(subscription.getTitle())
324
    while result['has_response']==1:
325 326
      portal_sync.PubSync(publication.getTitle())
      result = portal_sync.SubSync(subscription.getTitle())
327
      nb_message += 1 + result['has_response']
Sebastien Robin's avatar
Sebastien Robin committed
328 329
    return nb_message

330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
  def synchronizeWithBrokenMessage(self, id, run=run_all_test):
    """
    This just define how we synchronize, we have
    to define it here because it is specific to the unit testing
    """
    portal_sync = self.getSynchronizationTool()
    #portal_sync.email = None # XXX To be removed
    subscription = portal_sync.getSubscription(id)
    publication = None
    for publication in portal_sync.getPublicationList():
      if publication.getPublicationUrl()==subscription.getSubscriptionUrl():
        publication = publication
    self.failUnless(publication is not None)
    # reset files, because we do sync by files
    file = open('/tmp/sync_client1','w')
    file.write('')
    file.close()
    file = open('/tmp/sync_client2','w')
    file.write('')
    file.close()
    file = open('/tmp/sync','w')
    file.write('')
    file.close()
    nb_message = 1
354
    result = portal_sync.SubSync(subscription.getTitle())
355 356 357
    while result['has_response']==1:
      # We do thing three times, so that we will test
      # if we manage well duplicate messages
358 359 360 361 362 363
      portal_sync.PubSync(publication.getTitle())
      portal_sync.PubSync(publication.getTitle())
      portal_sync.PubSync(publication.getTitle())
      result = portal_sync.SubSync(subscription.getTitle())
      result = portal_sync.SubSync(subscription.getTitle())
      result = portal_sync.SubSync(subscription.getTitle())
364 365 366
      nb_message += 1 + result['has_response']
    return nb_message

367
  def testFirstSynchronization(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
368 369
    # We will try to populate the folder person_client1
    # with the data form person_server
370
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
371 372 373 374
    if not quiet:
      ZopeTestCase._print('\nTest First Synchronization ')
      LOG('Testing... ',0,'testFirstSynchronization')
    self.login()
375
    self.setupPublicationAndSubscription(quiet=1,run=1)
376
    nb_person = self.populatePersonServer(quiet=1,run=1)
377 378 379
    portal_sync = self.getSynchronizationTool()
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.SLOW_SYNC)
Sebastien Robin's avatar
Sebastien Robin committed
380 381
    # Synchronize the first client
    nb_message1 = self.synchronize(self.sub_id1)
382 383 384 385 386
    for sub in portal_sync.getSubscriptionList():
      if sub.getTitle() == self.sub_id1:
        self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
      else:
        self.assertEquals(sub.getSynchronizationType(),SyncCode.SLOW_SYNC)
Sebastien Robin's avatar
Sebastien Robin committed
387 388 389
    self.failUnless(nb_message1==self.nb_message_first_synchronization)
    # Synchronize the second client
    nb_message2 = self.synchronize(self.sub_id2)
390 391
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
Sebastien Robin's avatar
Sebastien Robin committed
392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
    self.failUnless(nb_message2==self.nb_message_first_synchronization)
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    subscription2 = portal_sync.getSubscription(self.sub_id2)
    self.failUnless(len(subscription1.getObjectList())==nb_person)
    person_server = self.getPersonServer() # We also check we don't
                                           # modify initial ob
    person1_s = person_server._getOb(self.id1)
    self.failUnless(person1_s.getId()==self.id1)
    self.failUnless(person1_s.getFirstName()==self.first_name1)
    self.failUnless(person1_s.getLastName()==self.last_name1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    self.failUnless(person1_c.getId()==self.id1)
    self.failUnless(person1_c.getFirstName()==self.first_name1)
    self.failUnless(person1_c.getLastName()==self.last_name1)
    self.failUnless(len(subscription2.getObjectList())==nb_person)
    person_client2 = self.getPersonClient2()
    person2_c = person_client2._getOb(self.id1)
    self.failUnless(person2_c.getId()==self.id1)
    self.failUnless(person2_c.getFirstName()==self.first_name1)
    self.failUnless(person2_c.getLastName()==self.last_name1)

414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443
  def testFirstSynchronizationWithLongLines(self, quiet=0, run=run_all_test):
    # We will try to populate the folder person_client1
    # with the data form person_server
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest First Synchronization With Long Lines ')
      LOG('Testing... ',0,'testFirstSynchronizationWithLongLines')
    self.login()
    self.setupPublicationAndSubscription(quiet=1,run=1)
    nb_person = self.populatePersonServer(quiet=1,run=1)
    person_server = self.getPersonServer()
    long_line = 'a' * 10000 + ' --- '
    person1_s = person_server._getOb(self.id1)
    kw = {'first_name':long_line} 
    person1_s.edit(**kw)
    # Synchronize the first client
    nb_message1 = self.synchronize(self.sub_id1)
    self.failUnless(nb_message1==self.nb_message_first_synchronization)
    portal_sync = self.getSynchronizationTool()
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    self.failUnless(len(subscription1.getObjectList())==nb_person)
    self.failUnless(person1_s.getId()==self.id1)
    self.failUnless(person1_s.getFirstName()==long_line)
    self.failUnless(person1_s.getLastName()==self.last_name1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    self.failUnless(person1_c.getId()==self.id1)
    self.failUnless(person1_c.getFirstName()==long_line)
    self.failUnless(person1_c.getLastName()==self.last_name1)

444
  def testGetObjectFromGid(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
445 446
    # We will try to get an object from a publication
    # just by givin the gid
447
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
448 449 450 451
    if not quiet:
      ZopeTestCase._print('\nTest getObjectFromGid ')
      LOG('Testing... ',0,'testGetObjectFromGid')
    self.login()
452
    self.setupPublicationAndSubscription(quiet=1,run=1)
453
    self.populatePersonServer(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
454 455 456 457 458 459 460
    # By default we can just give the id
    portal_sync = self.getSynchronizationTool()
    publication = portal_sync.getPublication(self.pub_id)
    object = publication.getObjectFromGid(self.id1)
    self.failUnless(object is not None)
    self.failUnless(object.getId()==self.id1)

461
  def testGetSynchronizationState(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
462 463
    # We will try to get the state of objects
    # that are just synchronized,
464
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
465 466 467
    if not quiet:
      ZopeTestCase._print('\nTest getSynchronizationState ')
      LOG('Testing... ',0,'testGetSynchronizationState')
468
    self.testFirstSynchronization(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
469 470 471 472 473 474 475 476 477 478 479 480 481
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    state_list_s = portal_sync.getSynchronizationState(person1_s)
    self.failUnless(len(state_list_s)==self.nb_subscription) # one state
                                                  # for each subscriber
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    state_list_c = portal_sync.getSynchronizationState(person1_c)
    self.failUnless(len(state_list_c)==1) # one state
                                        # for each subscriber
    self.checkSynchronizationStateIsSynchronized()

482
  def checkSynchronizationStateIsSynchronized(self, quiet=0, run=run_all_test):
Sebastien Robin's avatar
Sebastien Robin committed
483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    for person in person_server.objectValues():
      state_list = portal_sync.getSynchronizationState(person)
      for state in state_list:
        self.failUnless(state[1]==state[0].SYNCHRONIZED)
    person_client1 = self.getPersonClient1()
    for person in person_client1.objectValues():
      state_list = portal_sync.getSynchronizationState(person)
      for state in state_list:
        self.failUnless(state[1]==state[0].SYNCHRONIZED)
    person_client2 = self.getPersonClient2()
    for person in person_client2.objectValues():
      state_list = portal_sync.getSynchronizationState(person)
      for state in state_list:
        self.failUnless(state[1]==state[0].SYNCHRONIZED)
Sebastien Robin's avatar
Sebastien Robin committed
499 500 501 502
    # Check for each signature that the tempXML is None
    for sub in portal_sync.getSubscriptionList():
      for m in sub.getSignatureList():
        self.assertEquals(m.getTempXML(),None)
503
        self.assertEquals(m.getPartialXML(),None)
Sebastien Robin's avatar
Sebastien Robin committed
504 505 506
    for pub in portal_sync.getPublicationList():
      for sub in pub.getSubscriberList():
        for m in sub.getSignatureList():
507
          self.assertEquals(m.getPartialXML(),None)
Sebastien Robin's avatar
Sebastien Robin committed
508

509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
  def checkSynchronizationStateIsConflict(self, quiet=0, run=run_all_test):
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    for person in person_server.objectValues():
      if person.getId()==self.id1:
        state_list = portal_sync.getSynchronizationState(person)
        for state in state_list:
          self.failUnless(state[1]==state[0].CONFLICT)
    person_client1 = self.getPersonClient1()
    for person in person_client1.objectValues():
      if person.getId()==self.id1:
        state_list = portal_sync.getSynchronizationState(person)
        for state in state_list:
          self.failUnless(state[1]==state[0].CONFLICT)
    person_client2 = self.getPersonClient2()
    for person in person_client2.objectValues():
      if person.getId()==self.id1:
        state_list = portal_sync.getSynchronizationState(person)
        for state in state_list:
          self.failUnless(state[1]==state[0].CONFLICT)
    # make sure sub object are also in a conflict mode
    person = person_client1._getOb(self.id1)
    sub_person = person.newContent(id=self.id1,portal_type='Person')
    state_list = portal_sync.getSynchronizationState(sub_person)
    for state in state_list:
      self.failUnless(state[1]==state[0].CONFLICT)

536 537
  def testUpdateSimpleData(self, quiet=0, run=run_all_test):
    if not run: return
Sebastien Robin's avatar
Sebastien Robin committed
538 539 540
    if not quiet:
      ZopeTestCase._print('\nTest Update Simple Data ')
      LOG('Testing... ',0,'testUpdateSimpleData')
541
    self.testFirstSynchronization(quiet=1,run=1)
Sebastien Robin's avatar
Sebastien Robin committed
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558
    # First we do only modification on server
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    kw = {'first_name':self.first_name3,'last_name':self.last_name3}
    person1_s.edit(**kw)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    self.failUnless(person1_s.getFirstName()==self.first_name3)
    self.failUnless(person1_s.getLastName()==self.last_name3)
    self.failUnless(person1_c.getFirstName()==self.first_name3)
    self.failUnless(person1_c.getLastName()==self.last_name3)
    # Then we do only modification on a client
    kw = {'first_name':self.first_name1,'last_name':self.last_name1}
    person1_c.edit(**kw)
559
    #person1_c.setModificationDate(DateTime()+1)
Sebastien Robin's avatar
Sebastien Robin committed
560 561 562 563 564 565 566 567 568 569
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person1_s.getFirstName()==self.first_name1)
    self.failUnless(person1_s.getLastName()==self.last_name1)
    self.failUnless(person1_c.getFirstName()==self.first_name1)
    self.failUnless(person1_c.getLastName()==self.last_name1)
    # Then we do only modification on both the client and the server
    # and of course, on the same object
    kw = {'first_name':self.first_name3}
    person1_s.edit(**kw)
570
    #person1_s.setModificationDate(DateTime()+2)
Sebastien Robin's avatar
Sebastien Robin committed
571 572
    kw = {'description':self.description3}
    person1_c.edit(**kw)
573
    #person1_c.setModificationDate(DateTime()+2)
574 575 576 577 578 579
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person1_s.getFirstName()==self.first_name3)
    self.failUnless(person1_s.getDescription()==self.description3)
    self.failUnless(person1_c.getFirstName()==self.first_name3)
    self.failUnless(person1_c.getDescription()==self.description3)
Sebastien Robin's avatar
Sebastien Robin committed
580

581
  def testGetConflictList(self, quiet=0, run=run_all_test):
582
    # We will try to generate a conflict and then to get it
583
    # We will also make sure it contains what we want
584
    if not run: return
585 586 587
    if not quiet:
      ZopeTestCase._print('\nTest Get Conflict List ')
      LOG('Testing... ',0,'testGetConflictList')
588
    self.testFirstSynchronization(quiet=1,run=1)
589 590 591 592 593 594 595 596 597
    # First we do only modification on server
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person1_s.setDescription(self.description2)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    person1_c.setDescription(self.description3)
    self.synchronize(self.sub_id1)
598 599 600
    conflict_list = portal_sync.getConflictList()
    self.failUnless(len(conflict_list)==1)
    conflict = conflict_list[0]
601 602
    self.failUnless(person1_c.getDescription()==self.description3)
    self.failUnless(person1_s.getDescription()==self.description2)
603 604 605 606 607 608
    self.failUnless(conflict.getPropertyId()=='description')
    self.failUnless(conflict.getPublisherValue()==self.description2)
    self.failUnless(conflict.getSubscriberValue()==self.description3)
    subscriber = conflict.getSubscriber()
    self.failUnless(subscriber.getSubscriptionUrl()==self.subscription_url1)

609
  def testGetPublisherAndSubscriberDocument(self, quiet=0, run=run_all_test):
610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
    # We will try to generate a conflict and then to get it
    # We will also make sure it contains what we want
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Get Publisher And Subscriber Document ')
      LOG('Testing... ',0,'testGetPublisherAndSubscriberDocument')
    self.testGetConflictList(quiet=1,run=1)
    # First we do only modification on server
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    publisher_document = conflict.getPublisherDocument()
    self.failUnless(publisher_document.getDescription()==self.description2)
    subscriber_document = conflict.getSubscriberDocument()
    self.failUnless(subscriber_document.getDescription()==self.description3)

630
  def testApplyPublisherValue(self, quiet=0, run=run_all_test):
631 632
    # We will try to generate a conflict and then to get it
    # We will also make sure it contains what we want
633
    if not run: return
634
    self.testGetConflictList(quiet=1,run=1)
635 636 637 638 639 640 641 642 643 644 645 646 647 648
    if not quiet:
      ZopeTestCase._print('\nTest Apply Publisher Value ')
      LOG('Testing... ',0,'testApplyPublisherValue')
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    conflict.applyPublisherValue()
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person1_c.getDescription()==self.description2)
649
    self.failUnless(person1_s.getDescription()==self.description2)
650 651 652 653 654 655
    conflict_list = portal_sync.getConflictList()
    self.failUnless(len(conflict_list)==0)

  def testApplySubscriberValue(self, quiet=0, run=run_all_test):
    # We will try to generate a conflict and then to get it
    # We will also make sure it contains what we want
656 657
    if not run: return
    self.testGetConflictList(quiet=1,run=1)
658 659
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
660 661 662
    if not quiet:
      ZopeTestCase._print('\nTest Apply Subscriber Value ')
      LOG('Testing... ',0,'testApplySubscriberValue')
663 664 665 666 667 668 669 670 671 672 673 674 675
    conflict = conflict_list[0]
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    conflict.applySubscriberValue()
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person1_s.getDescription()==self.description3)
    self.failUnless(person1_c.getDescription()==self.description3)
    conflict_list = portal_sync.getConflictList()
    self.failUnless(len(conflict_list)==0)

676 677 678 679 680 681 682 683
  def populatePersonServerWithSubObject(self, quiet=0, run=run_all_test):
    """
    Before this method, we need to call populatePersonServer
    Then it will give the following tree :
    - person_server :
      - id1
        - id1
          - id2
684
          - id1
685 686 687 688 689 690 691 692 693 694 695 696
      - id2
    """
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Populate Person Server With Sub Object ')
      LOG('Testing... ',0,'populatePersonServerWithSubObject')
    person_server = self.getPersonServer()
    person1 = person_server._getOb(self.id1)
    sub_person1 = person1.newContent(id=self.id1,portal_type='Person')
    kw = {'first_name':self.first_name1,'last_name':self.last_name1,
          'description':self.description1}
    sub_person1.edit(**kw)
697 698 699 700 701
    sub_sub_person1 = sub_person1.newContent(id=self.id1,portal_type='Person')
    kw = {'first_name':self.first_name1,'last_name':self.last_name1,
          'description':self.description1}
    sub_sub_person1.edit(**kw)
    sub_sub_person2 = sub_person1.newContent(id=self.id2,portal_type='Person')
702 703
    kw = {'first_name':self.first_name2,'last_name':self.last_name2,
          'description':self.description2}
704
    sub_sub_person2.edit(**kw)
705
    # remove ('','portal...','person_server')
706 707 708
    len_path = len(sub_sub_person1.getPhysicalPath()) - 3 
    self.failUnless(len_path==3)
    len_path = len(sub_sub_person2.getPhysicalPath()) - 3 
709
    self.failUnless(len_path==3)
710

711 712 713 714 715 716 717 718 719 720 721 722 723
  def testAddSubObject(self, quiet=0, run=run_all_test):
    """
    In this test, we synchronize, then add sub object on the
    server and then see if the next synchronization will also
    create sub-objects on the client
    """
    if not run: return
    self.testFirstSynchronization(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Add Sub Object ')
      LOG('Testing... ',0,'testAddSubObject')
    self.populatePersonServerWithSubObject(quiet=1,run=1)
    self.synchronize(self.sub_id1)
724
    self.synchronize(self.sub_id2)
725 726 727 728
    self.checkSynchronizationStateIsSynchronized()
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    sub_person1_c = person1_c._getOb(self.id1)
729 730
    sub_sub_person1 = sub_person1_c._getOb(self.id1)
    sub_sub_person2 = sub_person1_c._getOb(self.id2)
731
    # remove ('','portal...','person_server')
732
    len_path = len(sub_sub_person1.getPhysicalPath()) - 3 
733
    self.failUnless(len_path==3)
734 735 736 737 738 739 740 741 742 743
    len_path = len(sub_sub_person2.getPhysicalPath()) - 3 
    self.failUnless(len_path==3)
    self.failUnless(sub_sub_person1.getDescription()==self.description1)
    self.failUnless(sub_sub_person1.getFirstName()==self.first_name1)
    self.failUnless(sub_sub_person1.getLastName()==self.last_name1)
    self.failUnless(sub_sub_person2.getDescription()==self.description2)
    self.failUnless(sub_sub_person2.getFirstName()==self.first_name2)
    self.failUnless(sub_sub_person2.getLastName()==self.last_name2)

  def testUpdateSubObject(self, quiet=0, run=run_all_test):
744
    """
745
      In this test, we start with a tree of object already
746
    synchronized, then we update a subobject, and we will see
747 748 749
    if it is updated correctly.
      To make this test a bit more harder, we will update on both
    the client and the server by the same time
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
    """
    if not run: return
    self.testAddSubObject(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Update Sub Object ')
      LOG('Testing... ',0,'testUpdateSubObject')
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    sub_person1_c = person1_c._getOb(self.id1)
    sub_sub_person_c = sub_person1_c._getOb(self.id2)
    person_server = self.getPersonServer()
    sub_sub_person_s = person_server._getOb(self.id1)._getOb(self.id1)._getOb(self.id2)
    kw = {'first_name':self.first_name3}
    sub_sub_person_c.edit(**kw)
    kw = {'description':self.description3}
    sub_sub_person_s.edit(**kw)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(sub_sub_person_c.getDescription()==self.description3)
    self.failUnless(sub_sub_person_c.getFirstName()==self.first_name3)
    self.failUnless(sub_sub_person_s.getDescription()==self.description3)
    self.failUnless(sub_sub_person_s.getFirstName()==self.first_name3)
772

773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001
  def testDeleteObject(self, quiet=0, run=run_all_test):
    """
      We will do a first synchronization, then delete an object on both
    sides, and we will see if nothing is left on the server and also
    on the two clients
    """
    if not run: return
    self.testFirstSynchronization(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Delete Object ')
      LOG('Testing... ',0,'testDeleteObject')
    person_server = self.getPersonServer()
    person_server.manage_delObjects(self.id1)
    person_client1 = self.getPersonClient1()
    person_client1.manage_delObjects(self.id2)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    portal_sync = self.getSynchronizationTool()
    publication = portal_sync.getPublication(self.pub_id)
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    subscription2 = portal_sync.getSubscription(self.sub_id2)
    self.failUnless(len(publication.getObjectList())==0)
    self.failUnless(len(subscription1.getObjectList())==0)
    self.failUnless(len(subscription2.getObjectList())==0)

  def testDeleteSubObject(self, quiet=0, run=run_all_test):
    """
      We will do a first synchronization, then delete a sub-object on both
    sides, and we will see if nothing is left on the server and also
    on the two clients
    - before :         after :
      - id1             - id1 
        - id1             - id1
          - id2         - id2
          - id1
      - id2
    """
    if not run: return
    self.testAddSubObject(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Delete Sub Object ')
      LOG('Testing... ',0,'testDeleteSubObject')
    person_server = self.getPersonServer()
    sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
    sub_object_s.manage_delObjects(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    sub_object_c1.manage_delObjects(self.id2)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    len_s = len(sub_object_s.objectValues())
    len_c1 = len(sub_object_c1.objectValues())
    len_c2 = len(sub_object_c2.objectValues())
    self.failUnless(len_s==len_c1==len_c2==0)

  def testGetConflictListOnSubObject(self, quiet=0, run=run_all_test):
    """
    We will change several attributes on a sub object on both the server
    and a client, then we will see if we have correctly the conflict list
    """
    if not run: return
    self.testAddSubObject(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Get Conflict List On Sub Object ')
      LOG('Testing... ',0,'testGetConflictListOnSubObject')
    person_server = self.getPersonServer()
    object_s = person_server._getOb(self.id1)
    sub_object_s = object_s._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    # Change values so that we will get conflicts
    kw = {'language':self.lang2,'description':self.description2}
    sub_object_s.edit(**kw)
    kw = {'language':self.lang3,'description':self.description3}
    sub_object_c1.edit(**kw)
    self.synchronize(self.sub_id1)
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
    self.failUnless(len(conflict_list)==2)
    conflict_list = portal_sync.getConflictList(sub_object_c1)
    self.failUnless(len(conflict_list)==0)
    conflict_list = portal_sync.getConflictList(object_s)
    self.failUnless(len(conflict_list)==0)
    conflict_list = portal_sync.getConflictList(sub_object_s)
    self.failUnless(len(conflict_list)==2)

  def testApplyPublisherDocumentOnSubObject(self, quiet=0, run=run_all_test):
    """
    there's several conflict on a sub object, we will see if we can
    correctly have the publisher version of this document
    """
    if not run: return
    self.testGetConflictListOnSubObject(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Apply Publisher Document On Sub Object ')
      LOG('Testing... ',0,'testApplyPublisherDocumentOnSubObject')
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    conflict.applyPublisherDocument()
    person_server = self.getPersonServer()
    sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(sub_object_s.getDescription()==self.description2)
    self.failUnless(sub_object_s.getLanguage()==self.lang2)
    self.failUnless(sub_object_c1.getDescription()==self.description2)
    self.failUnless(sub_object_c1.getLanguage()==self.lang2)
    self.failUnless(sub_object_c2.getDescription()==self.description2)
    self.failUnless(sub_object_c2.getLanguage()==self.lang2)

  def testApplySubscriberDocumentOnSubObject(self, quiet=0, run=run_all_test):
    """
    there's several conflict on a sub object, we will see if we can
    correctly have the subscriber version of this document
    """
    if not run: return
    self.testGetConflictListOnSubObject(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Apply Subscriber Document On Sub Object ')
      LOG('Testing... ',0,'testApplySubscriberDocumentOnSubObject')
    portal_sync = self.getSynchronizationTool()
    conflict_list = portal_sync.getConflictList()
    conflict = conflict_list[0]
    conflict.applySubscriberDocument()
    person_server = self.getPersonServer()
    sub_object_s = person_server._getOb(self.id1)._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    sub_object_c1 = person_client1._getOb(self.id1)._getOb(self.id1)
    person_client2 = self.getPersonClient2()
    sub_object_c2 = person_client2._getOb(self.id1)._getOb(self.id1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(sub_object_s.getDescription()==self.description3)
    self.failUnless(sub_object_s.getLanguage()==self.lang3)
    self.failUnless(sub_object_c1.getDescription()==self.description3)
    self.failUnless(sub_object_c1.getLanguage()==self.lang3)
    self.failUnless(sub_object_c2.getDescription()==self.description3)
    self.failUnless(sub_object_c2.getLanguage()==self.lang3)

  def testSynchronizeWithStrangeGid(self, quiet=0, run=run_all_test):
    """
    By default, the synchronization process use the id in order to
    recognize objects (because by default, getGid==getId. Here, we will see 
    if it also works with a somewhat strange getGid
    """
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Synchronize With Strange Gid ')
      LOG('Testing... ',0,'testSynchronizeWithStrangeGid')
    self.login()
    self.setupPublicationAndSubscriptionAndGid(quiet=1,run=1)
    nb_person = self.populatePersonServer(quiet=1,run=1)
    # This will test adding object
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    portal_sync = self.getSynchronizationTool()
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    self.failUnless(len(subscription1.getObjectList())==nb_person)
    publication = portal_sync.getPublication(self.pub_id)
    self.failUnless(len(publication.getObjectList())==nb_person)
    gid = self.first_name1 +  ' ' + self.last_name1 # ie the title 'Sebastien Robin'
    person_c1 = subscription1.getObjectFromGid(gid)
    id_c1 = person_c1.getId()
    self.failUnless(id_c1 in ('1','2')) # id given by the default generateNewId
    person_s = publication.getObjectFromGid(gid)
    id_s = person_s.getId()
    self.failUnless(id_s==self.id1)
    # This will test updating object
    person_s.setDescription(self.description3)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person_s.getDescription()==self.description3)
    self.failUnless(person_c1.getDescription()==self.description3)
    # This will test deleting object
    person_server = self.getPersonServer()
    person_client1 = self.getPersonClient1()
    person_server.manage_delObjects(self.id2)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(len(subscription1.getObjectList())==(nb_person-1))
    self.failUnless(len(publication.getObjectList())==(nb_person-1))
    person_s = publication.getObjectFromGid(gid)
    person_c1 = subscription1.getObjectFromGid(gid)
    self.failUnless(person_s.getDescription()==self.description3)
    self.failUnless(person_c1.getDescription()==self.description3)

  def testMultiNodeConflict(self, quiet=0, run=run_all_test):
    """
    We will create conflicts with 3 differents nodes, and we will
    solve it by taking one full version of documents.
    """
    if not run: return
    self.testFirstSynchronization(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Multi Node Conflict ')
      LOG('Testing... ',0,'testMultiNodeConflict')
    portal_sync = self.getSynchronizationTool()
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    kw = {'language':self.lang2,'description':self.description2,
          'format':self.format2}
    person1_s.edit(**kw)
    person_client1 = self.getPersonClient1()
    person1_c1 = person_client1._getOb(self.id1)
    kw = {'language':self.lang3,'description':self.description3,
          'format':self.format3}
    person1_c1.edit(**kw)
    person_client2 = self.getPersonClient2()
    person1_c2 = person_client2._getOb(self.id1)
    kw = {'language':self.lang4,'description':self.description4,
          'format':self.format4}
    person1_c2.edit(**kw)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    conflict_list = portal_sync.getConflictList()
    self.failUnless(len(conflict_list)==6)
1002 1003 1004
    # check if we have the state conflict on all clients
    self.checkSynchronizationStateIsConflict()

1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
    # we will take :
    # description on person_server
    # language on person_client1
    # format on person_client2
    for conflict in conflict_list:
      subscriber = conflict.getSubscriber()
      property = conflict.getPropertyId()
      resolve = 0
      if property == 'language':
        if subscriber.getSubscriptionUrl()==self.subscription_url1:
          resolve = 1
          conflict.applySubscriberValue()
      if property == 'format':
        if subscriber.getSubscriptionUrl()==self.subscription_url2:
          resolve = 1
          conflict.applySubscriberValue()
      if not resolve:
        conflict.applyPublisherValue()
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(person1_s.getDescription()==self.description2)
    self.failUnless(person1_c1.getDescription()==self.description2)
    self.failUnless(person1_c2.getDescription()==self.description2)
    self.failUnless(person1_s.getLanguage()==self.lang3)
    self.failUnless(person1_c1.getLanguage()==self.lang3)
    self.failUnless(person1_c2.getLanguage()==self.lang3)
    self.failUnless(person1_s.getFormat()==self.format4)
    self.failUnless(person1_c1.getFormat()==self.format4)
    self.failUnless(person1_c2.getFormat()==self.format4)
Sebastien Robin's avatar
Sebastien Robin committed
1035
        
1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096

  def testSynchronizeWorkflowHistory(self, quiet=0, run=run_all_test):
    """
    We will do a synchronization, then we will edit two times
    the object on the server, then two times the object on the
    client, and see if the global history as 4 more actions.
    """
    if not run: return
    self.testFirstSynchronization(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Synchronize WorkflowHistory ')
      LOG('Testing... ',0,'testSynchronizeWorkflowHistory')
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    kw1 = {'description':self.description1}
    kw2 = {'description':self.description2}
    len_wf = len(person1_s.workflow_history[self.workflow_id])
    person1_s.edit(**kw2)
    person1_c.edit(**kw2)
    person1_s.edit(**kw1)
    person1_c.edit(**kw1)
    self.synchronize(self.sub_id1)
    self.checkSynchronizationStateIsSynchronized()
    self.failUnless(len(person1_s.workflow_history[self.workflow_id])==len_wf+4)
    self.failUnless(len(person1_c.workflow_history[self.workflow_id])==len_wf+4)

  def testUpdateLocalRole(self, quiet=0, run=run_all_test):
    """
    We will do a first synchronization, then modify, add and delete
    an user role and see if it is correctly synchronized
    """
    if not run: return
    self.testFirstSynchronization(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Update Local Role ')
      LOG('Testing... ',0,'testUpdateLocalRole')
    # First, Create a new user
    uf = self.getPortal().acl_users
    uf._doAddUser('jp', '', ['Manager'], [])
    user = uf.getUserById('jp').__of__(uf)
    # then update create and delete roles
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person2_s = person_server._getOb(self.id2)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    person2_c = person_client1._getOb(self.id2)
    person1_s.manage_setLocalRoles('seb',['Manager','Owner'])
    person2_s.manage_setLocalRoles('jp',['Manager','Owner'])
    person2_s.manage_delLocalRoles(['seb'])
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    role_1_s = person1_s.get_local_roles()
    role_2_s = person2_s.get_local_roles()
    role_1_c = person1_c.get_local_roles()
    role_2_c = person2_c.get_local_roles()
    self.assertEqual(role_1_s,role_1_c)
    self.assertEqual(role_2_s,role_2_c)

1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123
  def testPartialData(self, quiet=0, run=run_all_test):
    """
    We will do a first synchronization, then we will do a change, then
    we will modify the SyncCode max_line value so it
    it will generate many messages
    """
    if not run: return
    self.testFirstSynchronization(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Partial Data ')
      LOG('Testing... ',0,'testPartialData')
    previous_max_lines = SyncCode.MAX_LINES
    SyncCode.MAX_LINES = 10
    self.populatePersonServerWithSubObject(quiet=1,run=1)
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    self.checkSynchronizationStateIsSynchronized()
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    sub_person1_c = person1_c._getOb(self.id1)
    sub_sub_person1 = sub_person1_c._getOb(self.id1)
    sub_sub_person2 = sub_person1_c._getOb(self.id2)
    # remove ('','portal...','person_server')
    len_path = len(sub_sub_person1.getPhysicalPath()) - 3 
    self.failUnless(len_path==3)
    len_path = len(sub_sub_person2.getPhysicalPath()) - 3 
    self.failUnless(len_path==3)
1124 1125 1126 1127 1128 1129
    self.assertEquals(sub_sub_person1.getDescription(),self.description1)
    self.assertEquals(sub_sub_person1.getFirstName(),self.first_name1)
    self.assertEquals(sub_sub_person1.getLastName(),self.last_name1)
    self.assertEquals(sub_sub_person2.getDescription(),self.description2)
    self.assertEquals(sub_sub_person2.getFirstName(),self.first_name2)
    self.assertEquals(sub_sub_person2.getLastName(),self.last_name2)
1130 1131
    SyncCode.MAX_LINES = previous_max_lines

1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167
  def testBrokenMessage(self, quiet=0, run=run_all_test):
    """
    With http synchronization, when a message is not well
    received, then we send message again, we want to
    be sure that is such case we don't do stupid things
    
    If we want to make this test more intersting, it is
    better to split messages
    """
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Broken Message ')
      LOG('Testing... ',0,'testBrokenMessage')
    previous_max_lines = SyncCode.MAX_LINES
    SyncCode.MAX_LINES = 10
    self.setupPublicationAndSubscription(quiet=1,run=1)
    nb_person = self.populatePersonServer(quiet=1,run=1)
    # Synchronize the first client
    nb_message1 = self.synchronizeWithBrokenMessage(self.sub_id1)
    #self.failUnless(nb_message1==self.nb_message_first_synchronization)
    portal_sync = self.getSynchronizationTool()
    subscription1 = portal_sync.getSubscription(self.sub_id1)
    self.failUnless(len(subscription1.getObjectList())==nb_person)
    person_server = self.getPersonServer() # We also check we don't
                                           # modify initial ob
    person1_s = person_server._getOb(self.id1)
    self.failUnless(person1_s.getId()==self.id1)
    self.failUnless(person1_s.getFirstName()==self.first_name1)
    self.failUnless(person1_s.getLastName()==self.last_name1)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    self.failUnless(person1_c.getId()==self.id1)
    self.failUnless(person1_c.getFirstName()==self.first_name1)
    self.failUnless(person1_c.getLastName()==self.last_name1)
    SyncCode.MAX_LINES = previous_max_lines

1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204
  def testGetSynchronizationType(self, quiet=0, run=run_all_test):
    # We will try to update some simple data, first
    # we change on the server side, the on the client side
    if not run: return
    if not quiet:
      ZopeTestCase._print('\nTest Get Synchronization Type ')
      LOG('Testing... ',0,'testGetSynchronizationType')
    self.testFirstSynchronization(quiet=1,run=1)
    # First we do only modification on server
    # Check for each subsription that the synchronization type
    # is TWO WAY
    portal_sync = self.getSynchronizationTool()
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    kw = {'first_name':self.first_name3,'last_name':self.last_name3}
    person1_s.edit(**kw)
    self.synchronize(self.sub_id1)
    # Then we do only modification on a client
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    kw = {'first_name':self.first_name1,'last_name':self.last_name1}
    person1_c.edit(**kw)
    self.synchronize(self.sub_id1)
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)
    # Then we do only modification on both the client and the server
    # and of course, on the same object
    kw = {'first_name':self.first_name3}
    person1_s.edit(**kw)
    kw = {'description':self.description3}
    person1_c.edit(**kw)
    self.synchronize(self.sub_id1)
    for sub in portal_sync.getSubscriptionList():
      self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY)

1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244
  def testUpdateLocalPermission(self, quiet=0, run=run_all_test):
    """
    We will do a first synchronization, then modify, add and delete
    an user role and see if it is correctly synchronized
    """
    if not run: return
    self.testFirstSynchronization(quiet=1,run=1)
    if not quiet:
      ZopeTestCase._print('\nTest Update Local Permission ')
      LOG('Testing... ',0,'testUpdateLocalPermission')
    # then create roles
    person_server = self.getPersonServer()
    person1_s = person_server._getOb(self.id1)
    person2_s = person_server._getOb(self.id2)
    person_client1 = self.getPersonClient1()
    person1_c = person_client1._getOb(self.id1)
    person2_c = person_client1._getOb(self.id2)
    person1_s.manage_setLocalPermissions('View',['Manager','Owner'])
    person2_s.manage_setLocalPermissions('View',['Manager','Owner'])
    person2_s.manage_setLocalPermissions('View management screens',['Owner',])
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    role_1_s = person1_s.get_local_permissions()
    role_2_s = person2_s.get_local_permissions()
    role_1_c = person1_c.get_local_permissions()
    role_2_c = person2_c.get_local_permissions()
    self.assertEqual(role_1_s,role_1_c)
    self.assertEqual(role_2_s,role_2_c)
    person1_s.manage_setLocalPermissions('View',['Owner'])
    person2_s.manage_setLocalPermissions('View',None)
    person2_s.manage_setLocalPermissions('View management screens',())
    self.synchronize(self.sub_id1)
    self.synchronize(self.sub_id2)
    role_1_s = person1_s.get_local_permissions()
    role_2_s = person2_s.get_local_permissions()
    role_1_c = person1_c.get_local_permissions()
    role_2_c = person2_c.get_local_permissions()
    self.assertEqual(role_1_s,role_1_c)
    self.assertEqual(role_2_s,role_2_c)

1245 1246 1247 1248
  # We may add a test in order to check if the slow_sync mode works fine, ie
  # if we do have both object on the client and server side, we must make sure
  # that the server first sends is own data

Sebastien Robin's avatar
Sebastien Robin committed
1249 1250 1251 1252 1253 1254 1255 1256 1257
if __name__ == '__main__':
    framework()
else:
    import unittest
    def test_suite():
        suite = unittest.TestSuite()
        suite.addTest(unittest.makeSuite(TestERP5SyncML))
        return suite