19.5 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480
# -*- coding: utf-8 -*-
## Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#          Sebastien Robin <>
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.

from os import path
from lxml import etree
from logging import getLogger, Formatter

from AccessControl import ClassSecurityInfo

from Products.ERP5Type.Tool.BaseTool import BaseTool
from Products.ERP5Type import Permissions
from Products.ERP5Type.Globals import InitializeClass
from Products.ERP5SyncML.SyncMLConstant import ACTIVITY_PRIORITY, \
from Products.ERP5SyncML.SyncMLMessage import SyncMLResponse, SyncMLRequest
from Products.ERP5SyncML.Engine.SynchronousEngine import SyncMLSynchronousEngine
from Products.ERP5SyncML.Engine.AsynchronousEngine import SyncMLAsynchronousEngine
from Products.ERP5SyncML.Transport.HTTP import HTTPTransport
from Products.ERP5SyncML.Transport.File import FileTransport
from Products.ERP5SyncML.Transport.Mail import MailTransport
from Products.ERP5.ERP5Site import getSite

synchronous_engine = SyncMLSynchronousEngine()
asynchronous_engine = SyncMLAsynchronousEngine()

transport_scheme_dict = {
  "http" : HTTPTransport(),
  "https" : HTTPTransport(),
  "file" : FileTransport(),
  "mail" : MailTransport(),

parser = etree.XMLParser(remove_blank_text=True)

# Logging channel definitions
# Main logging channel
syncml_logger = getLogger('ERP5SyncML')
# Direct logging to "[instancehome]/log/ERP5SyncML.log", if this
# directory exists. Otherwise, it will end up in root logging
# facility (ie, event.log).
from App.config import getConfiguration
instancehome = getConfiguration().instancehome
if instancehome is not None:
  log_directory = path.join(instancehome, 'log')
  if path.isdir(log_directory):
    from Signals import Signals
    from ZConfig.components.logger.loghandler import FileHandler
    log_file_handler = FileHandler(path.join(log_directory,
    # Default zope log format string borrowed from
    # ZConfig/components/logger/factory.xml, but without the extra "------"
    # line separating entries.
      "%(asctime)s %(levelname)s %(name)s %(message)s",
    syncml_logger.propagate = 0

def checkAlertCommand(syncml_request):
  This parse the alert commands received and return a
  dictionnary mapping database to sync mode
  database_alert_list = []
  # XXX To be moved on engine
  search = getSite().portal_categories.syncml_alert_code.searchFolder
  for alert in syncml_request.alert_list:
    if alert["data"] == "222":
      # 222 is for asking next message, do not care
    # Retrieve the category
    # XXX Categories must be redefined, ID must be code, not title so
    # that we can drop the use of searchFolder
    alert_code_category_list = search(reference=alert['data'])
    if len(alert_code_category_list) == 1:
      alert_code_category = alert_code_category_list[0].getId()
      # Must return (405) Command not allowed
      raise NotImplementedError("Alert code is %s, got %s category" %
    # Copy the whole dict & add the category id
    alert["code"] = alert_code_category

  return database_alert_list

class SynchronizationTool(BaseTool):
  """ This tool implements the SyncML Protocol

  SyncML Protocol defines how to synchronize data between clients and server.

  Here is a mapping of the specification with the implementation in this tool :
  - client are subscriptions
  - server are publications
  - change log are managed through the use of signatures. A signature contains
    the last data sent and which was successfully synchronized. When running a
    new synchronization new data is compared with the one stored in signature
    to detect changes.
  meta_type = 'ERP5 Synchronizations'
  portal_type = 'Synchronization Tool'

  id = "portal_synchronizations"

  security = ClassSecurityInfo()

  def getConflictList(self, context=None):
    Retrieve the list of all conflicts
    Here the list is as follow :
    [conflict_1,conflict2,...] where conflict_1 is like:
    conflict_list = []
    for publication in self.searchFolder(portal_type='SyncML Publication'):
      for result in publication.searchFolder(
          portal_type='SyncML Subscription'):
        subscriber = result.getObject()
        sub_conflict_list = subscriber.getConflictList()
        for conflict in sub_conflict_list:
          if context is None or conflict.getOriginValue() == context:
    for result in self.searchFolder(portal_type='SyncML Subscription'):
      subscription = result.getObject()
      sub_conflict_list = subscription.getConflictList()
      for conflict in sub_conflict_list:
        if context is None or conflict.getOriginValue() == context:
    return conflict_list

  def getDocumentConflictList(self, context=None):
    Retrieve the list of all conflicts for a given document
    Well, this is the same thing as getConflictList with a path
    return self.getConflictList(context)

  def getSubscriberDocumentVersion(self, conflict, docid):
    Given a 'conflict' and a 'docid' refering to a new version of a
    document, applies the conflicting changes to the document's new
    version. By so, two differents versions of the same document will be
    Thus, the manager will be able to open both version of the document
    before selecting which one to keep.
    subscriber = conflict.getSubscriber()
    publisher_object = conflict.getOrigineValue()
    publisher_xml = self.getXMLObject(
    directory = publisher_object.aq_parent
    object_id = docid
    if object_id in directory.objectIds():
      directory._delObject(object_id)  # XXX Why not manage_delObjects ?
      # Import the conduit and get it
      conduit = subscriber.getConduit()
      conduit.addNode(xml=publisher_xml, object=directory,
      subscriber_document = directory._getOb(object_id)
      for c in self.getConflictList(conflict.getOriginValue()):
        if c.getSubscriber() == subscriber:
      return subscriber_document

  # XXX- ?
  def _getCopyId(self, document):
    directory = document.aq_inner.aq_parent
    if directory.getId() != 'portal_repository':
      document_id = document.getId() + '_conflict_copy'
      if document_id in directory.objectIds():
        directory._delObject(document_id)  # XXX manage_delObjects ?
      repotool = directory
      docid = repotool.getDocidAndRevisionFromObjectId(document.getId())[0]
      new_rev = repotool.getFreeRevision(docid) + 10  # make sure it's not gonna provoke conflicts
      document_id = repotool._getId(docid, new_rev)
    return document_id

  # XXX-Aurel : the following methods must be moved to a specific part that
  # manages protocols to send/receive messages
  def readResponse(self, text='', sync_id=None, from_url=None):
    We will look at the url and we will see if we need to send mail, http
    response, or just copy to a file.
    """'readResponse sync_id %s, text %s' % (sync_id, text))
    if text:
      # we are still anonymous at this time, use unrestrictedSearchResults
      # to fetch the Subcribers
      catalog_tool = self.getPortalObject().portal_catalog.unrestrictedSearchResults
      syncml_request = SyncMLRequest(text)

      # It is assumed that client & server does not share the same database ID
      # (source_reference); this must be checked using constraint
      for publication in catalog_tool(portal_type='SyncML Publication',
        if publication.getUrlString() == syncml_request.header['target']:
          # Do not process in activity first checking, a message ordering
          # is required by protocol specification, only use activity when no
          # race condition can happen (ie no final tag)

          # XXX For now do in activity otherwise we never answer the HTTP request
          # directly and so it ends with client/server stuck waiting for answer and
          # on the other side we are doing an http request to send them
          if publication.getIsActivityEnabled():
            return self.activate(
                publication.getPath(), text)
            return self.processServerSynchronization(publication.getPath(), text)

      for subscription in catalog_tool(portal_type='SyncML Subscription',
        if subscription.getSubscriptionUrlString() == syncml_request.header['target']:
          if subscription.getIsActivityEnabled():
            return self.activate(activity="SQLQueue",
            return self.processClientSynchronization(subscription.getPath(), text)

      # XXX maybe it is better to generate a syncml error message
      raise ValueError("Impossible to find a pub/sub to process message %s:%s"
                       % (sync_id, syncml_request.header['target']))

    # we use from only if we have a file
    elif isinstance(from_url, basestring):
      if from_url.startswith('file:'):
        filename = from_url[len('file:'):]
        xml = None
          stream = file(filename, 'r')
        except IOError:
          # XXX-Aurel : Why raising here make unit tests to fail ?
          # raise ValueError("Impossible to read file %s, error is %s"
          #                  % (filename, msg))
          xml =
        syncml_logger.debug('readResponse xml from file is %s' % (xml,))
        if xml:
          return xml
  # End of part managing protocols

  # Following methods are related to server (Publication)
  def processServerSynchronization(self, publication_path, msg=None):
      This is the synchronization method for the server
    # Read the request from the client
    publication = self.unrestrictedTraverse(publication_path)
    if publication.getIsActivityEnabled():
      engine = asynchronous_engine
      engine = synchronous_engine

    if msg is None:
      # Read message from file
      msg = self.readResponse(from_url=publication.getUrlString(),
    if msg is not None:
      syncml_request = SyncMLRequest(msg)"\tXML received from client %s" %(str(syncml_request)))

      # Get the subscriber
      subscription_url = syncml_request.header['source']
      subscriber = publication.getSubscriber(subscription_url)  # XXX method to be renamed

      # Alert commands are generated at initialization phase or when client ask
      # for the remaining messages
      database_alert_list = checkAlertCommand(syncml_request)
      assert len(database_alert_list) <= 1, "Multi-databases sync no supported"
      if len(database_alert_list):
        # We are initializing the synchronization
        if subscriber and subscriber.getSynchronizationState() not in \
              ("not_running", "initializing", "finished"):
            'Trying to start a synchronization on server side : %s although synchronisation is already running'
            % (subscriber.getPath(),))
          # Prevent initilisation if sync already running
        syncml_response = engine.processServerInitialization(
        if not subscriber:
          raise ValueError("First synchronization message must contains alert command")
          # Let engine manage the synchronization
            return engine.processServerSynchronization(subscriber, syncml_request)
          except SynchronizationError:
      # This must be implemented following the syncml protocol, not with this hack
      raise NotImplementedError("Starting sync process from server is forbidden")

    # Return message for unit test purpose
    return str(syncml_response)

  # Following methods are related to client (subscription)
  def processClientSynchronization(self, subscription_path, msg=None):
      This is the synchronization method for the client

      This is the first method called to launch a synchronization process
    subscription = self.unrestrictedTraverse(subscription_path)
    if subscription.getIsActivityEnabled():
      engine = asynchronous_engine
      engine = synchronous_engine

    if msg is None and subscription.getSubscriptionUrlString('').find('file') >= 0:
      # XXX This is a hack for unit test only, must be removed
      msg = self.readResponse(sync_id=subscription.getDestinationReference(),

    if msg is None:
      # This is a synchronization initialisation call
      # Even if call on asynchronous engine, this will not use activities
      syncml_response = engine.initializeClientSynchronization(subscription)
      syncml_request = SyncMLRequest(msg)

      if not subscription.checkCorrectRemoteMessageId(
        # Message already processed, resend the response
        # XXX How to make sure we send the good last response ?
        raise NotImplementedError
        return engine.processClientSynchronization(syncml_request, subscription)

    # Send the message
    # XXX This must depends on activity enables property, maybe use engine
    if subscription.getIsActivityEnabled():
        after_tag="%s_reset" %(subscription.getPath(),),

    return str(syncml_response)

  def applySyncCommand(self, subscription_path, response_message_id,
                       activate_kw, **kw):
    This methods is intented to be called by asynchronous engine in activity to
    apply sync commands for a subset of data
    As engines are not zodb object, the tool acts as a placeholder for method
    that need to be called in activities
    subscription = self.restrictedTraverse(subscription_path)
    assert subscription is not None, "Impossible to find subscription %s" \
        % (subscription_path)
    # Build Message
    if response_message_id:
      syncml_response = SyncMLResponse()
      syncml_response = None

    subscription.applySyncCommand(syncml_response=syncml_response, **kw)

    # Send the message in activity to prevent recomputing data in case of
    # transport failure
    if syncml_response:
      syncml_logger("---- %s sending %s notifications of sync"
                    % (subscription.getTitle(),
                            # group_method_id=None,
                            # group_method_cost=.05,

  def sendSyncCommand(self, id_list, message_id, subscription_path,
                      activate_kw, is_final_message=False):
    This methods is intented to be called by asynchronous engine in activity to
    send sync commands for a subset of data
    As engines are not zodb object, the tool acts as a placeholder for method
    that need to be called in activities
    subscription = self.restrictedTraverse(subscription_path)
    assert subscription is not None, "Impossible to find subscription %s" \
        % (subscription_path)
    # Build Message
    syncml_response = SyncMLResponse()


    if is_final_message:
      # Notify that all modifications were sent

    # Send the message in activity to prevent recomputing data in case of
    # transport failure
    # activate_kw["group_method_id"] = None
    # activate_kw["group_method_cost"] = .05
