SynchronizationTool.py 35.4 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
## Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#          Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

27
"""
Jean-Paul Smets's avatar
Jean-Paul Smets committed
28 29 30 31
ERP portal_synchronizations tool.
"""

from OFS.SimpleItem import SimpleItem
32 33
from Products.ERP5Type.Document.Folder import Folder
from Products.ERP5Type.Base import Base
Jean-Paul Smets's avatar
Jean-Paul Smets committed
34 35 36 37 38 39
from Products.CMFCore.utils import UniqueObject
from Globals import InitializeClass, DTMLFile, PersistentMapping, Persistent
from AccessControl import ClassSecurityInfo, getSecurityManager
from Products.CMFCore import CMFCorePermissions
from Products.ERP5SyncML import _dtmldir
from Publication import Publication,Subscriber
40
from Products.BTreeFolder2.BTreeFolder2 import BTreeFolder2
Jean-Paul Smets's avatar
Jean-Paul Smets committed
41 42
from Subscription import Subscription,Signature
from xml.dom.ext.reader.Sax2 import FromXmlStream, FromXml
Sebastien Robin's avatar
Sebastien Robin committed
43
from xml.dom.minidom import parse, parseString
Sebastien Robin's avatar
Sebastien Robin committed
44
from Products.ERP5Type import Permissions
Jean-Paul Smets's avatar
Jean-Paul Smets committed
45 46
from PublicationSynchronization import PublicationSynchronization
from SubscriptionSynchronization import SubscriptionSynchronization
47
from Products.CMFCore.utils import getToolByName
48
from AccessControl.SecurityManagement import newSecurityManager
49
from AccessControl.SecurityManagement import noSecurityManager
50
from AccessControl.User import UnrestrictedUser
Sebastien Robin's avatar
Sebastien Robin committed
51
from Acquisition import aq_base
52
from xml.parsers.expat import ExpatError # parseString error
53
import urllib
54
import urllib2
55
import socket
56
import os
Jean-Paul Smets's avatar
Jean-Paul Smets committed
57
import string
58 59
import commands
import random
60
from zLOG import LOG
Jean-Paul Smets's avatar
Jean-Paul Smets committed
61

62

Jean-Paul Smets's avatar
Jean-Paul Smets committed
63 64
from Conduit.ERP5Conduit import ERP5Conduit

65 66
class SynchronizationTool( SubscriptionSynchronization, PublicationSynchronization, 
                           UniqueObject, Folder, Base):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
67 68 69 70 71 72 73 74
  """
    This tool implements the synchronization algorithm
  """


  id       = 'portal_synchronizations'
  meta_type    = 'ERP5 Synchronizations'

75 76 77 78
  # On the server, this is use to keep track of the temporary
  # copies.
  objectsToRemove = [] 
  
Jean-Paul Smets's avatar
Jean-Paul Smets committed
79 80 81 82 83 84 85 86 87 88 89 90 91
  security = ClassSecurityInfo()

  #
  #  Default values.
  #
  list_publications = PersistentMapping()
  list_subscriptions = PersistentMapping()

  # Do we want to use emails ?
  #email = None
  email = 1
  same_export = 1

92 93 94 95
  # Multiple inheritance inconsistency caused by Base must be circumvented
  def __init__( self, *args, **kwargs ):
    Folder.__init__(self, self.id, **kwargs)

Jean-Paul Smets's avatar
Jean-Paul Smets committed
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112

  #
  #  ZMI methods
  #
  manage_options = ( ( { 'label'   : 'Overview'
             , 'action'   : 'manage_overview'
             }
            , { 'label'   : 'Publications'
             , 'action'   : 'managePublications'
             }
            , { 'label'   : 'Subscriptions'
             , 'action'   : 'manageSubscriptions'
             }
            , { 'label'   : 'Conflicts'
             , 'action'   : 'manageConflicts'
             }
            )
113
           + Folder.manage_options
Jean-Paul Smets's avatar
Jean-Paul Smets committed
114 115 116 117 118 119 120 121 122 123 124
           )

  security.declareProtected( CMFCorePermissions.ManagePortal
               , 'manage_overview' )
  manage_overview = DTMLFile( 'dtml/explainSynchronizationTool', globals() )

  security.declareProtected( CMFCorePermissions.ManagePortal
               , 'managePublications' )
  managePublications = DTMLFile( 'dtml/managePublications', globals() )

  security.declareProtected( CMFCorePermissions.ManagePortal
125 126
               , 'manage_addPublicationForm' )
  manage_addPublicationForm = DTMLFile( 'dtml/manage_addPublication', globals() )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
127 128 129 130 131 132 133 134 135 136

  security.declareProtected( CMFCorePermissions.ManagePortal
               , 'manageSubsciptions' )
  manageSubscriptions = DTMLFile( 'dtml/manageSubscriptions', globals() )

  security.declareProtected( CMFCorePermissions.ManagePortal
               , 'manageConflicts' )
  manageConflicts = DTMLFile( 'dtml/manageConflicts', globals() )

  security.declareProtected( CMFCorePermissions.ManagePortal
137 138
               , 'manage_addSubscriptionForm' )
  manage_addSubscriptionForm = DTMLFile( 'dtml/manage_addSubscription', globals() )
Jean-Paul Smets's avatar
Jean-Paul Smets committed
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158

  security.declareProtected( CMFCorePermissions.ManagePortal
               , 'editProperties' )
  def editProperties( self
           , publisher=None
           , REQUEST=None
           ):
    """
      Form handler for "tool-wide" properties (including list of
      metadata elements).
    """
    if publisher is not None:
      self.publisher = publisher

    if REQUEST is not None:
      REQUEST[ 'RESPONSE' ].redirect( self.absolute_url()
                    + '/propertiesForm'
                    + '?manage_tabs_message=Tool+updated.'
                    )

159
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_addPublication')
160
  def manage_addPublication(self, title, publication_url, destination_path,
161
            query, xml_mapping, gpg_key, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
162 163 164
    """
      create a new publication
    """
165 166 167
    #if not('publications' in self.objectIds()):
    #  publications = Folder('publications')
    #  self._setObject(publications.id, publications)
168
    folder = self.getObjectContainer()
169 170
    new_id = self.getPublicationIdFromTitle(title)
    pub = Publication(new_id, title, publication_url, destination_path,
171
                      query, xml_mapping, gpg_key)
172
    folder._setObject( new_id, pub )
173 174 175
    #if len(self.list_publications) == 0:
    #  self.list_publications = PersistentMapping()
    #self.list_publications[id] = pub
Jean-Paul Smets's avatar
Jean-Paul Smets committed
176 177 178
    if RESPONSE is not None:
      RESPONSE.redirect('managePublications')

179
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_addSubscription')
180
  def manage_addSubscription(self, title, publication_url, subscription_url,
181
                       destination_path, query, xml_mapping, gpg_key, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
182
    """
Sebastien Robin's avatar
Sebastien Robin committed
183
      XXX should be renamed as addSubscription
Jean-Paul Smets's avatar
Jean-Paul Smets committed
184 185
      create a new subscription
    """
186 187 188
    #if not('subscriptions' in self.objectIds()):
    #  subscriptions = Folder('subscriptions')
    #  self._setObject(subscriptions.id, subscriptions)
189
    folder = self.getObjectContainer()
190 191
    new_id = self.getSubscriptionIdFromTitle(title)
    sub = Subscription(new_id, title, publication_url, subscription_url,
192
                       destination_path, query, xml_mapping, gpg_key)
193
    folder._setObject( new_id, sub )
194 195 196
    #if len(self.list_subscriptions) == 0:
    #  self.list_subscriptions = PersistentMapping()
    #self.list_subscriptions[id] = sub
Jean-Paul Smets's avatar
Jean-Paul Smets committed
197 198 199
    if RESPONSE is not None:
      RESPONSE.redirect('manageSubscriptions')

200
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_editPublication')
201
  def manage_editPublication(self, title, publication_url, destination_path,
202
                       query, xml_mapping, gpg_key, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
203 204 205
    """
      modify a publication
    """
206
    pub = self.getPublication(title)
207 208 209 210 211 212
    pub.setTitle(title)
    pub.setPublicationUrl(publication_url)
    pub.setDestinationPath(destination_path)
    pub.setQuery(query)
    pub.setXMLMapping(xml_mapping)
    pub.setGPGKey(gpg_key)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
213 214 215
    if RESPONSE is not None:
      RESPONSE.redirect('managePublications')

216
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_editSubscription')
217
  def manage_editSubscription(self, title, publication_url, subscription_url,
218
             destination_path, query, xml_mapping, gpg_key, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
219 220 221
    """
      modify a subscription
    """
222
    sub = self.getSubscription(title)
223 224 225 226 227 228 229
    sub.setTitle(title)
    sub.setPublicationUrl(publication_url)
    sub.setDestinationPath(destination_path)
    sub.setQuery(query)
    sub.setXMLMapping(xml_mapping)
    sub.setGPGKey(gpg_key)
    sub.setSubscriptionUrl(subscription_url)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
230 231 232
    if RESPONSE is not None:
      RESPONSE.redirect('manageSubscriptions')

233
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_deletePublication')
234
  def manage_deletePublication(self, title, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
235 236 237
    """
      delete a publication
    """
238
    id = self.getPublicationIdFromTitle(title)
239 240
    folder = self.getObjectContainer()
    folder._delObject(id)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
241 242 243
    if RESPONSE is not None:
      RESPONSE.redirect('managePublications')

244
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_deleteSubscription')
245
  def manage_deleteSubscription(self, title, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
246 247 248
    """
      delete a subscription
    """
249
    id = self.getSubscriptionIdFromTitle(title)
250 251
    folder = self.getObjectContainer()
    folder._delObject(id)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
252 253 254
    if RESPONSE is not None:
      RESPONSE.redirect('manageSubscriptions')

255
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_resetPublication')
256
  def manage_resetPublication(self, title, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
257 258 259
    """
      reset a publication
    """
260
    pub = self.getPublication(title)
261
    pub.resetAllSubscribers()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
262 263 264
    if RESPONSE is not None:
      RESPONSE.redirect('managePublications')

265
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_resetSubscription')
266
  def manage_resetSubscription(self, title, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
267 268 269
    """
      reset a subscription
    """
270
    sub = self.getSubscription(title)
271 272
    sub.resetAllSignatures()
    sub.resetAnchors()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
273 274 275
    if RESPONSE is not None:
      RESPONSE.redirect('manageSubscriptions')

276 277 278 279 280 281 282 283 284
  security.declareProtected(Permissions.ModifyPortalContent, 'manage_syncSubscription')
  def manage_syncSubscription(self, title, RESPONSE=None):
    """
      reset a subscription
    """
    self.SubSync(title)
    if RESPONSE is not None:
      RESPONSE.redirect('manageSubscriptions')

Sebastien Robin's avatar
Sebastien Robin committed
285
  security.declareProtected(Permissions.AccessContentsInformation,'getPublicationList')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
286 287 288 289
  def getPublicationList(self):
    """
      Return a list of publications
    """
290 291
    folder = self.getObjectContainer()
    object_list = folder.objectValues()
292 293
    object_list = filter(lambda x: x.id.find('pub')==0,object_list)
    return object_list
Jean-Paul Smets's avatar
Jean-Paul Smets committed
294

295
  security.declareProtected(Permissions.AccessContentsInformation,'getPublication')
296
  def getPublication(self, title):
297
    """
298
      Return the  publications with this id
299
    """
300 301 302
    for p in self.getPublicationList():
      if p.getTitle() == title:
        return p
303
    return None
304

305 306 307 308 309 310 311 312 313 314 315 316
  security.declareProtected(Permissions.AccessContentsInformation,'getObjectContainer')
  def getObjectContainer(self):
    """
    this returns the external mount point if there is one
    """
    folder = self
    portal_url = getToolByName(self,'portal_url')
    root = portal_url.getPortalObject().aq_parent
    if 'external_mount_point' in root.objectIds():
      folder = root.external_mount_point
    return folder

Sebastien Robin's avatar
Sebastien Robin committed
317
  security.declareProtected(Permissions.AccessContentsInformation,'getSubscriptionList')
Jean-Paul Smets's avatar
Jean-Paul Smets committed
318 319 320 321
  def getSubscriptionList(self):
    """
      Return a list of publications
    """
322 323
    folder = self.getObjectContainer()
    object_list = folder.objectValues()
324 325
    object_list = filter(lambda x: x.id.find('sub')==0,object_list)
    return object_list
Jean-Paul Smets's avatar
Jean-Paul Smets committed
326

327
  def getSubscription(self, title):
328 329 330
    """
      Returns the subscription with this id
    """
331 332 333
    for s in self.getSubscriptionList():
      if s.getTitle() == title:
        return s
334 335 336
    return None


Sebastien Robin's avatar
Sebastien Robin committed
337
  security.declareProtected(Permissions.AccessContentsInformation,'getSynchronizationList')
338
  def getSynchronizationList(self):
339 340
    """
      Returns the list of subscriptions and publications
Sebastien Robin's avatar
Sebastien Robin committed
341

342 343 344
    """
    return self.getSubscriptionList() + self.getPublicationList()

Sebastien Robin's avatar
Sebastien Robin committed
345
  security.declareProtected(Permissions.AccessContentsInformation,'getSubscriberList')
346 347 348 349 350 351 352 353 354 355
  def getSubscriberList(self):
    """
      Returns the list of subscribers and subscriptions
    """
    s_list = []
    s_list += self.getSubscriptionList()
    for publication in self.getPublicationList():
      s_list += publication.getSubscriberList()
    return s_list

Sebastien Robin's avatar
Sebastien Robin committed
356
  security.declareProtected(Permissions.AccessContentsInformation,'getConflictList')
357
  def getConflictList(self, context=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
358 359 360 361
    """
    Retrieve the list of all conflicts
    Here the list is as follow :
    [conflict_1,conflict2,...] where conflict_1 is like:
362
    ['publication',publication_id,object.getPath(),property_id,publisher_value,subscriber_value]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
363
    """
364
    path = self.resolveContext(context)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
365 366
    conflict_list = []
    for publication in self.getPublicationList():
Sebastien Robin's avatar
Sebastien Robin committed
367 368 369 370
      for subscriber in publication.getSubscriberList():
        sub_conflict_list = subscriber.getConflictList()
        for conflict in sub_conflict_list:
          #conflict.setDomain('Publication')
371
          conflict.setSubscriber(subscriber)
Sebastien Robin's avatar
Sebastien Robin committed
372 373
          #conflict.setDomainId(subscriber.getId())
          conflict_list += [conflict.__of__(self)]
Jean-Paul Smets's avatar
Jean-Paul Smets committed
374 375 376
    for subscription in self.getSubscriptionList():
      sub_conflict_list = subscription.getConflictList()
      for conflict in sub_conflict_list:
377
        #conflict.setDomain('Subscription')
378
        conflict.setSubscriber(subscription)
Sebastien Robin's avatar
Sebastien Robin committed
379 380
        #conflict.setDomainId(subscription.getId())
        conflict_list += [conflict.__of__(self)]
381 382 383 384
    if path is not None: # Retrieve only conflicts for a given path
      new_list = []
      for conflict in conflict_list:
        if conflict.getObjectPath() == path:
Sebastien Robin's avatar
Sebastien Robin committed
385
          new_list += [conflict.__of__(self)]
386
      return new_list
Jean-Paul Smets's avatar
Jean-Paul Smets committed
387 388
    return conflict_list

389 390 391 392 393 394 395 396 397
  security.declareProtected(Permissions.AccessContentsInformation,'getDocumentConflictList')
  def getDocumentConflictList(self, context=None):
    """
    Retrieve the list of all conflicts for a given document
    Well, this is the same thing as getConflictList with a path
    """
    return self.getConflictList(context)


Sebastien Robin's avatar
Sebastien Robin committed
398
  security.declareProtected(Permissions.AccessContentsInformation,'getSynchronizationState')
399
  def getSynchronizationState(self, context):
400
    """
401
    context : the context on which we are looking for state
402

403 404 405
    This functions have to retrieve the synchronization state,
    it will first look in the conflict list, if nothing is found,
    then we have to check on a publication/subscription.
406

407
    This method returns a mapping between subscription and states
Sebastien Robin's avatar
Sebastien Robin committed
408 409 410 411 412

    JPS suggestion:
      path -> object, document, context, etc.
      type -> '/titi/toto' or ('','titi', 'toto') or <Base instance 1562567>
      object = self.resolveContext(context) (method to add)
413
    """
414
    path = self.resolveContext(context)
415 416 417 418 419 420
    conflict_list = self.getConflictList()
    state_list= []
    LOG('getSynchronizationState',0,'path: %s' % str(path))
    for conflict in conflict_list:
      if conflict.getObjectPath() == path:
        LOG('getSynchronizationState',0,'found a conflict: %s' % str(conflict))
421
        state_list += [[conflict.getSubscriber(),self.CONFLICT]]
422
    for domain in self.getSynchronizationList():
423 424 425 426 427 428 429 430 431 432 433 434
      destination = domain.getDestinationPath()
      LOG('getSynchronizationState',0,'destination: %s' % str(destination))
      j_path = '/'.join(path)
      LOG('getSynchronizationState',0,'j_path: %s' % str(j_path))
      if j_path.find(destination)==0:
        o_id = j_path[len(destination)+1:].split('/')[0]
        LOG('getSynchronizationState',0,'o_id: %s' % o_id)
        subscriber_list = []
        if domain.domain_type==self.PUB:
          subscriber_list = domain.getSubscriberList()
        else:
          subscriber_list = [domain]
435
        LOG('getSynchronizationState, subscriber_list:',0,subscriber_list)
436 437 438 439
        for subscriber in subscriber_list:
          signature = subscriber.getSignature(o_id)
          if signature is not None:
            state = signature.getStatus()
440 441
            LOG('getSynchronizationState:',0,'sub.dest :%s, state: %s' % \
                                   (subscriber.getSubscriptionUrl(),str(state)))
442 443 444 445 446 447 448 449
            found = None
            # Make sure there is not already a conflict giving the state
            for state_item in state_list:
              if state_item[0]==subscriber:
                found = 1
            if found is None:
              state_list += [[subscriber,state]]
    return state_list
450

451 452
  security.declareProtected(Permissions.ModifyPortalContent, 'applyPublisherValue')
  def applyPublisherValue(self, conflict):
Sebastien Robin's avatar
Sebastien Robin committed
453 454 455 456 457
    """
      after a conflict resolution, we have decided
      to keep the local version of an object
    """
    object = self.unrestrictedTraverse(conflict.getObjectPath())
458
    subscriber = conflict.getSubscriber()
Sebastien Robin's avatar
Sebastien Robin committed
459
    # get the signature:
Sebastien Robin's avatar
Sebastien Robin committed
460
    LOG('p_sync.applyPublisherValue, subscriber: ',0,subscriber)
Sebastien Robin's avatar
Sebastien Robin committed
461 462 463
    signature = subscriber.getSignature(object.getId()) # XXX may be change for rid
    signature.delConflict(conflict)
    if signature.getConflictList() == []:
Sebastien Robin's avatar
Sebastien Robin committed
464
      LOG('p_sync.applyPublisherValue, conflict_list empty on : ',0,signature)
465 466 467 468 469
      # Delete the copy of the object if the there is one
      directory = object.aq_parent
      copy_id = object.id + '_conflict_copy'
      if copy_id in directory.objectIds():
        directory._delObject(copy_id)
Sebastien Robin's avatar
Sebastien Robin committed
470 471
      signature.setStatus(self.PUB_CONFLICT_MERGE)

472 473 474 475 476 477
  security.declareProtected(Permissions.ModifyPortalContent, 'applyPublisherDocument')
  def applyPublisherDocument(self, conflict):
    """
    apply the publisher value for all conflict of the given document
    """
    subscriber = conflict.getSubscriber()
Sebastien Robin's avatar
Sebastien Robin committed
478
    LOG('applyPublisherDocument, subscriber: ',0,subscriber)
479 480
    for c in self.getConflictList(conflict.getObjectPath()):
      if c.getSubscriber() == subscriber:
Sebastien Robin's avatar
Sebastien Robin committed
481
        LOG('applyPublisherDocument, applying on conflict: ',0,conflict)
482 483
        c.applyPublisherValue()

484
  security.declareProtected(Permissions.AccessContentsInformation, 'getPublisherDocumentPath')
485 486 487 488 489 490 491
  def getPublisherDocumentPath(self, conflict):
    """
    apply the publisher value for all conflict of the given document
    """
    subscriber = conflict.getSubscriber()
    return conflict.getObjectPath()

492
  security.declareProtected(Permissions.AccessContentsInformation, 'getPublisherDocument')
493 494 495 496 497 498 499 500 501 502
  def getPublisherDocument(self, conflict):
    """
    apply the publisher value for all conflict of the given document
    """
    publisher_object_path = self.getPublisherDocumentPath(conflict)
    LOG('getPublisherDocument publisher_object_path',0,publisher_object_path)
    publisher_object = self.unrestrictedTraverse(publisher_object_path)
    LOG('getPublisherDocument publisher_object',0,publisher_object)
    return publisher_object

503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531

  def getSubscriberDocumentVersion(self, conflict, docid):
    """
    Given a 'conflict' and a 'docid' refering to a new version of a
    document, applies the conflicting changes to the document's new
    version. By so, two differents versions of the same document will be
    available.
    Thus, the manager will be able to open both version of the document
    before selecting which one to keep.
    """
    
    subscriber = conflict.getSubscriber()
    publisher_object_path = conflict.getObjectPath()
    publisher_object = self.unrestrictedTraverse(publisher_object_path)
    publisher_xml = self.getXMLObject(object=publisher_object,xml_mapping\
                                            = subscriber.getXMLMapping())

    directory = publisher_object.aq_parent
    object_id = docid
    if object_id in directory.objectIds():
        directory._delObject(object_id)
        conduit = ERP5Conduit()
        conduit.addNode(xml=publisher_xml,object=directory,object_id=object_id)
        subscriber_document = directory._getOb(object_id)
        for c in self.getConflictList(conflict.getObjectPath()):
            if c.getSubscriber() == subscriber:
                c.applySubscriberValue(object=subscriber_document)
        return subscriber_document

532
  security.declareProtected(Permissions.AccessContentsInformation, 'getSubscriberDocumentPath')
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
  def getSubscriberDocumentPath(self, conflict):
    """
    apply the publisher value for all conflict of the given document
    """
    subscriber = conflict.getSubscriber()
    publisher_object_path = conflict.getObjectPath()
    publisher_object = self.unrestrictedTraverse(publisher_object_path)
    publisher_xml = self.getXMLObject(object=publisher_object,xml_mapping = subscriber.getXMLMapping())
    directory = publisher_object.aq_parent
    object_id = publisher_object.id + '_conflict_copy'
    if object_id in directory.objectIds():
      directory._delObject(object_id)
    conduit = ERP5Conduit()
    conduit.addNode(xml=publisher_xml,object=directory,object_id=object_id)
    subscriber_document = directory._getOb(object_id)
    for c in self.getConflictList(conflict.getObjectPath()):
      if c.getSubscriber() == subscriber:
        c.applySubscriberValue(object=subscriber_document)
    return subscriber_document.getPhysicalPath()

553
  security.declareProtected(Permissions.AccessContentsInformation, 'getSubscriberDocument')
554 555 556 557 558 559 560 561
  def getSubscriberDocument(self, conflict):
    """
    apply the publisher value for all conflict of the given document
    """
    subscriber_object_path = self.getSubscriberDocumentPath(conflict)
    subscriber_object = self.unrestrictedTraverse(subscriber_object_path)
    return subscriber_object

562 563 564 565 566 567 568 569 570 571 572
  security.declareProtected(Permissions.ModifyPortalContent, 'applySubscriberDocument')
  def applySubscriberDocument(self, conflict):
    """
    apply the subscriber value for all conflict of the given document
    """
    subscriber = conflict.getSubscriber()
    for c in self.getConflictList(conflict.getObjectPath()):
      if c.getSubscriber() == subscriber:
        c.applySubscriberValue()

  security.declareProtected(Permissions.ModifyPortalContent, 'applySubscriberValue')
573
  def applySubscriberValue(self, conflict,object=None):
Sebastien Robin's avatar
Sebastien Robin committed
574 575 576 577
    """
      after a conflict resolution, we have decided
      to keep the local version of an object
    """
578 579 580 581 582 583 584
    solve_conflict = 1
    if object is None:
      object = self.unrestrictedTraverse(conflict.getObjectPath())
    else:
      # This means an object was given, this is used in order
      # to see change on a copy, so don't solve conflict
      solve_conflict=0
585
    subscriber = conflict.getSubscriber()
Sebastien Robin's avatar
Sebastien Robin committed
586 587 588 589 590 591
    # get the signature:
    LOG('p_sync.setRemoteObject, subscriber: ',0,subscriber)
    signature = subscriber.getSignature(object.getId()) # XXX may be change for rid
    conduit = ERP5Conduit()
    for xupdate in conflict.getXupdateList():
      conduit.updateNode(xml=xupdate,object=object,force=1)
592 593 594
    if solve_conflict:
      signature.delConflict(conflict)
      if signature.getConflictList() == []:
595 596 597 598 599
        # Delete the copy of the object if the there is one
        directory = object.aq_parent
        copy_id = object.id + '_conflict_copy'
        if copy_id in directory.objectIds():
          directory._delObject(copy_id)
600
        signature.setStatus(self.PUB_CONFLICT_MERGE)
Sebastien Robin's avatar
Sebastien Robin committed
601 602 603


  security.declareProtected(Permissions.ModifyPortalContent, 'manageLocalValue')
604
  def managePublisherValue(self, subscription_url, property_id, object_path, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
605 606 607
    """
    Do whatever needed in order to store the local value on
    the remote server
Sebastien Robin's avatar
Sebastien Robin committed
608 609 610 611 612

    Suggestion (API)
      add method to view document with applied xupdate
      of a given subscriber XX (ex. viewSubscriberDocument?path=ddd&subscriber_id=dddd)
      Version=Version CPS
Jean-Paul Smets's avatar
Jean-Paul Smets committed
613 614
    """
    # Retrieve the conflict object
Sebastien Robin's avatar
Sebastien Robin committed
615
    LOG('manageLocalValue',0,'%s %s %s' % (str(subscription_url),
616
                                           str(property_id),
Sebastien Robin's avatar
Sebastien Robin committed
617 618 619
                                           str(object_path)))
    for conflict in self.getConflictList():
      LOG('manageLocalValue, conflict:',0,conflict)
620 621
      if conflict.getPropertyId() == property_id:
        LOG('manageLocalValue',0,'found the property_id')
Sebastien Robin's avatar
Sebastien Robin committed
622
        if '/'.join(conflict.getObjectPath())==object_path:
623
          if conflict.getSubscriber().getSubscriptionUrl()==subscription_url:
624
            conflict.applyPublisherValue()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
625 626 627
    if RESPONSE is not None:
      RESPONSE.redirect('manageConflicts')

Sebastien Robin's avatar
Sebastien Robin committed
628
  security.declareProtected(Permissions.ModifyPortalContent, 'manageRemoteValue')
629
  def manageSubscriberValue(self, subscription_url, property_id, object_path, RESPONSE=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
630 631 632 633
    """
    Do whatever needed in order to store the remote value locally
    and confirmed that the remote box should keep it's value
    """
Sebastien Robin's avatar
Sebastien Robin committed
634
    LOG('manageLocalValue',0,'%s %s %s' % (str(subscription_url),
635
                                           str(property_id),
Sebastien Robin's avatar
Sebastien Robin committed
636 637 638
                                           str(object_path)))
    for conflict in self.getConflictList():
      LOG('manageLocalValue, conflict:',0,conflict)
639 640
      if conflict.getPropertyId() == property_id:
        LOG('manageLocalValue',0,'found the property_id')
Sebastien Robin's avatar
Sebastien Robin committed
641
        if '/'.join(conflict.getObjectPath())==object_path:
642
          if conflict.getSubscriber().getSubscriptionUrl()==subscription_url:
643
            conflict.applySubscriberValue()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
644 645 646
    if RESPONSE is not None:
      RESPONSE.redirect('manageConflicts')

647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663
  def resolveContext(self, context):
    """
    We try to return a path (like ('','erp5','foo') from the context.
    Context can be :
      - a path
      - an object
      - a string representing a path
    """
    if context is None:
      return context
    elif type(context) is type(()):
      return context
    elif type(context) is type('a'):
      return tuple(context.split('/'))
    else:
      return context.getPhysicalPath()

664
  security.declarePublic('sendResponse')
665
  def sendResponse(self, to_url=None, from_url=None, sync_id=None,xml=None, domain=None, send=1):
666 667 668 669
    """
    We will look at the url and we will see if we need to send mail, http
    response, or just copy to a file.
    """
670
    LOG('sendResponse, self.getPhysicalPath: ',0,self.getPhysicalPath())
671 672 673 674
    LOG('sendResponse, to_url: ',0,to_url)
    LOG('sendResponse, from_url: ',0,from_url)
    LOG('sendResponse, sync_id: ',0,sync_id)
    LOG('sendResponse, xml: ',0,xml)
675 676 677 678 679 680 681
    if domain is not None:
      gpg_key = domain.getGPGKey()
      if gpg_key not in ('',None):
        filename = str(random.randrange(1,2147483600)) + '.txt'
        decrypted = file('/tmp/%s' % filename,'w')
        decrypted.write(xml)
        decrypted.close()
682 683
        (status,output)=commands.getstatusoutput('gzip /tmp/%s' % filename)
        (status,output)=commands.getstatusoutput('gpg --yes --homedir /var/lib/zope/Products/ERP5SyncML/gnupg_keys -r "%s" -se /tmp/%s.gz' % (gpg_key,filename))
684
        LOG('readResponse, gpg output:',0,output)
685
        encrypted = file('/tmp/%s.gz.gpg' % filename,'r')
686 687
        xml = encrypted.read()
        encrypted.close()
688 689 690 691 692
        commands.getstatusoutput('rm -f /tmp/%s.gz' % filename)
        commands.getstatusoutput('rm -f /tmp/%s.gz.gpg' % filename)
    if send:
      if type(to_url) is type('a'):
        if to_url.find('http://')==0:
693 694 695
          # XXX Make sure this is not a problem
          if domain.domain_type == self.PUB:
            return None
696
          # we will send an http response
Sebastien Robin's avatar
Sebastien Robin committed
697
          domain = aq_base(domain)
698
          LOG('sendResponse, will start sendHttpResponse, xml',0,xml)
699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714
          self.activate(activity='RAMQueue').sendHttpResponse(sync_id=sync_id,
                                           to_url=to_url,
                                           xml=xml, domain=domain)
          return None
        elif to_url.find('file://')==0:
          filename = to_url[len('file:/'):]
          stream = file(filename,'w')
          LOG('sendResponse, filename: ',0,filename)
          stream.write(xml)
          stream.close()
          # we have to use local files (unit testing for example
        elif to_url.find('mailto:')==0:
          # we will send an email
          to_address = to_url[len('mailto:'):]
          from_address = from_url[len('mailto:'):]
          self.sendMail(from_address,to_address,sync_id,xml)
715
    return xml
716 717

  security.declarePrivate('sendHttpResponse')
718
  def sendHttpResponse(self, to_url=None, sync_id=None, xml=None, domain=None ):
719
    LOG('sendHttpResponse, self.getPhysicalPath: ',0,self.getPhysicalPath())
720
    LOG('sendHttpResponse, starting with domain:',0,domain)
Sebastien Robin's avatar
Sebastien Robin committed
721
    #LOG('sendHttpResponse, xml:',0,xml)
722 723 724
    if domain is not None:
      if domain.domain_type == self.PUB:
        return xml
725 726 727 728 729 730 731 732 733 734 735 736 737 738 739
    # Retrieve the proxy from os variables
    proxy_url = ''
    if os.environ.has_key('http_proxy'):
      proxy_url = os.environ['http_proxy']
    LOG('sendHttpResponse, proxy_url:',0,proxy_url)
    if proxy_url !='':
      proxy_handler = urllib2.ProxyHandler({"http" :proxy_url})
    else:
      proxy_handler = urllib2.ProxyHandler({})
    pass_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
    auth_handler = urllib2.HTTPBasicAuthHandler(pass_mgr)
    proxy_auth_handler = urllib2.ProxyBasicAuthHandler(pass_mgr)
    opener = urllib2.build_opener(proxy_handler, proxy_auth_handler,auth_handler,urllib2.HTTPHandler)
    urllib2.install_opener(opener)
    to_encode = {'text':xml,'sync_id':sync_id}
740
    encoded = urllib.urlencode(to_encode)
741 742
    if to_url.find('readResponse')<0:
      to_url = to_url + '/portal_synchronizations/readResponse'
743
    request = urllib2.Request(url=to_url,data=encoded)
744 745 746 747 748 749 750 751
    #result = urllib2.urlopen(request).read()
    try:
      result = urllib2.urlopen(request).read()
    except socket.error, msg:
      self.activate(activity='RAMQueue').sendHttpResponse(to_url=to_url,sync_id=sync_id,xml=xml,domain=domain)
      LOG('sendHttpResponse, socket ERROR:',0,msg)
      return

752
    
753
    LOG('sendHttpResponse, before result, domain:',0,domain)
Sebastien Robin's avatar
Sebastien Robin committed
754
    #LOG('sendHttpResponse, result:',0,result)
755 756
    if domain is not None:
      if domain.domain_type == self.SUB:
757
        gpg_key = domain.getGPGKey()
758
        if result not in (None,''):
759 760
          #if gpg_key not in ('',None):
          #  result = self.sendResponse(domain=domain,xml=result,send=0)
761 762 763
          uf = self.acl_users
          user = UnrestrictedUser('syncml','syncml',['Manager','Member'],'')
          newSecurityManager(None, user)
764 765
          #self.activate(activity='RAMQueue').readResponse(sync_id=sync_id,text=result)
          self.readResponse(sync_id=sync_id,text=result)
766 767 768 769 770 771 772 773 774 775 776 777 778 779 780

  security.declarePublic('sync')
  def sync(self):
    """
    This will try to synchronize every subscription
    """
    # Login as a manager to make sure we can create objects
    uf = self.acl_users
    user = UnrestrictedUser('syncml','syncml',['Manager','Member'],'')
    newSecurityManager(None, user)
    message_list = self.portal_activities.getMessageList()
    LOG('sync, message_list:',0,message_list)
    if len(message_list) == 0:
      for subscription in self.getSubscriptionList():
        LOG('sync, subcription:',0,subscription)
781
        self.activate(activity='RAMQueue').SubSync(subscription.getTitle())
782 783 784 785 786 787 788 789

  security.declarePublic('readResponse')
  def readResponse(self, text=None, sync_id=None, to_url=None, from_url=None):
    """
    We will look at the url and we will see if we need to send mail, http
    response, or just copy to a file.
    """
    LOG('readResponse, ',0,'starting')
790
    LOG('readResponse, self.getPhysicalPath: ',0,self.getPhysicalPath())
791
    LOG('readResponse, sync_id: ',0,sync_id)
Sebastien Robin's avatar
Sebastien Robin committed
792
    #LOG('readResponse, text:',0,text)
793 794 795 796 797
    # Login as a manager to make sure we can create objects
    uf = self.acl_users
    user = UnrestrictedUser('syncml','syncml',['Manager','Member'],'')
    newSecurityManager(None, user)

798
    if text is not None:
799 800 801 802 803
      # XXX We will look everywhere for a publication/subsription with
      # the id sync_id, this is not so good, but there is no way yet
      # to know if we will call a publication or subscription XXX
      gpg_key = ''
      for publication in self.getPublicationList():
804
        if publication.getTitle()==sync_id:
805 806 807
          gpg_key = publication.getGPGKey()
      if gpg_key == '':
        for subscription in self.getSubscriptionList():
808
          if subscription.getTitle()==sync_id:
809 810 811 812
            gpg_key = subscription.getGPGKey()
      # decrypt the message if needed
      if gpg_key not in (None,''):
        filename = str(random.randrange(1,2147483600)) + '.txt'
813
        encrypted = file('/tmp/%s.gz.gpg' % filename,'w')
814 815
        encrypted.write(text)
        encrypted.close()
816
        (status,output)=commands.getstatusoutput('gpg --homedir /var/lib/zope/Products/ERP5SyncML/gnupg_keys -r "%s"  --decrypt /tmp/%s.gz.gpg > /tmp/%s.gz' % (gpg_key,filename,filename))
817
        LOG('readResponse, gpg output:',0,output)
818
        (status,output)=commands.getstatusoutput('gunzip /tmp/%s.gz' % filename)
819 820
        decrypted = file('/tmp/%s' % filename,'r')
        text = decrypted.read()
821
        LOG('readResponse, text:',0,text)
822 823
        decrypted.close()
        commands.getstatusoutput('rm -f /tmp/%s' % filename)
824
        commands.getstatusoutput('rm -f /tmp/%s.gz.gpg' % filename)
825 826
      # Get the target and then find the corresponding publication or
      # Subscription
827
      LOG('readResponse, xml before parseSTring',0,text)
Sebastien Robin's avatar
Sebastien Robin committed
828
      xml = parseString(text)
829 830 831 832 833 834 835 836 837
      url = ''
      for subnode in self.getElementNodeList(xml):
        if subnode.nodeName == 'SyncML':
          for subnode1 in self.getElementNodeList(subnode):
            if subnode1.nodeName == 'SyncHdr':
              for subnode2 in self.getElementNodeList(subnode1):
                if subnode2.nodeName == 'Target':
                  url = subnode2.childNodes[0].data 
      for publication in self.getPublicationList():
838
        if publication.getPublicationUrl()==url and publication.getTitle()==sync_id:
839
          result = self.PubSync(sync_id,xml)
840 841 842 843
          # Then encrypt the message
          xml = result['xml']
          xml = self.sendResponse(xml=xml,domain=publication,send=0)
          return xml
844
      for subscription in self.getSubscriptionList():
845
        if subscription.getSubscriptionUrl()==url and subscription.getTitle()==sync_id:
846 847
          result = self.activate(activity='RAMQueue').SubSync(sync_id,xml)
          #result = self.SubSync(sync_id,xml)
848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863

    # we use from only if we have a file 
    elif type(from_url) is type('a'):
      if from_url.find('file://')==0:
        try:
          filename = from_url[len('file:/'):]
          stream = file(filename,'r')
          xml = stream.read()
          #stream.seek(0)
          #LOG('readResponse',0,'Starting... msg: %s' % str(stream.read()))
        except IOError:
          LOG('readResponse, cannot read file: ',0,filename)
          xml = None
        if xml is not None and len(xml)==0:
          xml = None
        return xml
864

865 866 867 868 869 870 871 872 873 874 875 876 877 878
  security.declareProtected(Permissions.ModifyPortalContent, 'getPublicationIdFromTitle')
  def getPublicationIdFromTitle(self, title):
    """
    simply return an id from a title
    """
    return 'pub_' + title

  security.declareProtected(Permissions.ModifyPortalContent, 'getPublicationIdFromTitle')
  def getSubscriptionIdFromTitle(self, title):
    """
    simply return an id from a title
    """
    return 'sub_' + title

879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896
#  security.declarePrivate('notify_sync')
#  def notify_sync(self, event_type, object, infos):
#    """Notification from the event service.
#
#    # XXX very specific to cps
#
#    Called when an object is added/deleted/modified.
#    Update the date of sync
#    """
#    from Products.CPSCore.utils import _isinstance
#    from Products.CPSCore.ProxyBase import ProxyBase
#
#    if event_type in ('sys_modify_object',
#                      'modify_object'):
#      if not(_isinstance(object, ProxyBase)):
#        repotool = getToolByName(self, 'portal_repository')
#        if repotool.isObjectInRepository(object):
#          object_id = object.getId()
897 898


Jean-Paul Smets's avatar
Jean-Paul Smets committed
899
InitializeClass( SynchronizationTool )