Subscription.py 18.1 KB
Newer Older
Jean-Paul Smets's avatar
Jean-Paul Smets committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
##############################################################################
#
# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved.
#          Sebastien Robin <seb@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################

from Globals import PersistentMapping
from time import gmtime,strftime # for anchors
from SyncCode import SyncCode
Sebastien Robin's avatar
Sebastien Robin committed
32 33
from Products.CMFCore.utils import getToolByName
from Acquisition import Implicit, aq_base
Jean-Paul Smets's avatar
Jean-Paul Smets committed
34 35 36 37
from zLOG import LOG

import md5

Sebastien Robin's avatar
Sebastien Robin committed
38
class Conflict(SyncCode, Implicit):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
39 40 41
  """
    object_path : the path of the obect
    keyword : an identifier of the conflict
42 43
    publisher_value : the value that we have locally
    subscriber_value : the value sent by the remote box
Jean-Paul Smets's avatar
Jean-Paul Smets committed
44 45

  """
46 47
  def __init__(self, object_path=None, keyword=None, xupdate=None, publisher_value=None,\
               subscriber_value=None, subscriber=None):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
48 49
    self.object_path=object_path
    self.keyword = keyword
50 51 52
    self.setLocalValue(publisher_value)
    self.setRemoteValue(subscriber_value)
    self.subscriber = subscriber
53
    self.resetXupdate()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
54 55 56 57 58 59 60

  def getObjectPath(self):
    """
    get the domain
    """
    return self.object_path

61
  def getPublisherValue(self):
62 63 64
    """
    get the domain
    """
65
    return self.publisher_value
66

67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
  def getXupdateList(self):
    """
    get the xupdate wich gave an error
    """
    xupdate_list = []
    if len(self.xupdate)>0:
      for xupdate in self.xupdate:
        xupdate_list+= [xupdate]
    return xupdate_list

  def resetXupdate(self):
    """
    Reset the xupdate list
    """
    self.xupdate = PersistentMapping()

  def setXupdate(self, xupdate):
    """
    set the xupdate
    """
    if xupdate == None:
      self.resetXupdate()
    else:
      self.xupdate = self.getXupdateList() + [xupdate]

  def setXupdateList(self, xupdate):
    """
    set the xupdate
    """
    self.xupdate = xupdate

98 99 100 101 102
  def setLocalValue(self, value):
    """
    get the domain
    """
    try:
103
      self.publisher_value = value
104
    except TypeError: # It happens when we try to store StringIO
105
      self.publisher_value = None
106

107
  def getSubscriberValue(self):
108 109 110
    """
    get the domain
    """
111
    return self.subscriber_value
112 113 114 115 116 117

  def setRemoteValue(self, value):
    """
    get the domain
    """
    try:
118
      self.subscriber_value = value
119
    except TypeError: # It happens when we try to store StringIO
120
      self.subscriber_value = None
121

122
  def applyPublisherValue(self):
Sebastien Robin's avatar
Sebastien Robin committed
123 124 125 126 127
    """
      after a conflict resolution, we have decided
      to keep the local version of this object
    """
    p_sync = getToolByName(self,'portal_synchronizations')
128
    p_sync.applyPublisherValue(self)
Sebastien Robin's avatar
Sebastien Robin committed
129

130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
  def applyPublisherDocument(self):
    """
      after a conflict resolution, we have decided
      to keep the local version of this object
    """
    p_sync = getToolByName(self,'portal_synchronizations')
    p_sync.applyPublisherDocument(self)

  def applySubscriberDocument(self):
    """
      after a conflict resolution, we have decided
      to keep the local version of this object
    """
    p_sync = getToolByName(self,'portal_synchronizations')
    p_sync.applySubscriberDocument(self)

  def applySubscriberValue(self):
Sebastien Robin's avatar
Sebastien Robin committed
147 148 149 150
    """
    get the domain
    """
    p_sync = getToolByName(self,'portal_synchronizations')
151
    p_sync.applySubscriberValue(self)
Sebastien Robin's avatar
Sebastien Robin committed
152

153
  def setSubscriber(self, subscriber):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
154 155 156
    """
    set the domain
    """
157
    self.subscriber = subscriber
Jean-Paul Smets's avatar
Jean-Paul Smets committed
158

159
  def getSubscriber(self):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
160 161 162
    """
    get the domain
    """
163
    return self.subscriber
Jean-Paul Smets's avatar
Jean-Paul Smets committed
164

165 166 167 168 169 170
  def getKeyword(self):
    """
    get the domain
    """
    return self.keyword

171
  def getPropertyId(self):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
172
    """
173
    get the property id
Jean-Paul Smets's avatar
Jean-Paul Smets committed
174
    """
175
    return self.keyword
Jean-Paul Smets's avatar
Jean-Paul Smets committed
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196

class Signature(SyncCode):
  """
    status -- SENT, CONFLICT...
    md5_object -- An MD5 value of a given document
    #uid -- The UID of the document
    id -- the ID of the document
    rid -- the uid of the document on the remote database,
        only needed on the server.
    xml -- the xml of the object at the time where it was synchronized
  """

  # Constructor
  def __init__(self,id=None, status=None, xml_string=None):
    self.id = id
    self.status = status
    self.setXML(xml_string)
    self.partial_xml = None
    self.action = None
    self.setTempXML(None)
    self.resetConflictList()
197
    self.md5_string = None
Jean-Paul Smets's avatar
Jean-Paul Smets committed
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
    self.force = 0

  #def __init__(self,object=None, status=None, xml_string=None):
  #  self.uid = object.uid
  #  self.id = object.id
  #  self.status = status
  #  self.setXML(xml_string)

  def setStatus(self, status):
    """
      set the Status (see SyncCode for numbers)
    """
    self.status = status
    if status == self.SYNCHRONIZED:
      temp_xml = self.getTempXML()
      self.setForce(0)
      if temp_xml is not None:
        # This happens when we have sent the xml
        # and we just get the confirmation
        self.setXML(self.getTempXML())
      self.setTempXML(None)
      if len(self.getConflictList())>0:
        self.resetConflictList()
221 222 223
    elif status in (self.PUB_CONFLICT_MERGE,self.SENT):
      # We have a solution for the conflict, don't need to keep the list
      self.resetConflictList()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249

  def getStatus(self):
    """
      get the Status (see SyncCode for numbers)
    """
    return self.status

  def getForce(self):
    """
      get the force value (if we need to force update or not)
    """
    return self.force

  def setForce(self, force):
    """
      set the force value (if we need to force update or not)
    """
    self.force = force

  def setXML(self, xml):
    """
      set the XML corresponding to the object
    """
    self.xml = xml
    if self.xml != None:
      self.setTempXML(None) # We make sure that the xml will not be erased
250
      self.setMD5(xml)
Jean-Paul Smets's avatar
Jean-Paul Smets committed
251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281

  def getXML(self):
    """
      set the XML corresponding to the object
    """
    return self.xml

  def setTempXML(self, xml):
    """
      This is the xml temporarily saved, it will
      be stored with setXML when we will receive
      the confirmation of synchronization
    """
    self.temp_xml = xml

  def getTempXML(self):
    """
      get the temp xml
    """
    return self.temp_xml

  def setMD5(self, xml):
    """
      set the MD5 object of this signature
    """
    self.md5_string = md5.new(xml).digest()

  def getMD5(self):
    """
      get the MD5 object of this signature
    """
282
    return self.md5_string
Jean-Paul Smets's avatar
Jean-Paul Smets committed
283 284 285 286 287 288 289 290

  def checkMD5(self, xml_string):
    """
    check if the given md5_object returns the same things as
    the one stored in this signature, this is very usefull
    if we want to know if an objects has changed or not
    Returns 1 if MD5 are equals, else it returns 0
    """
291
    return ((md5.new(xml_string).digest()) == self.getMD5())
Jean-Paul Smets's avatar
Jean-Paul Smets committed
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321

  def setRid(self, rid):
    """
      set the rid
    """
    self.rid = rid

  def getRid(self):
    """
      get the rid
    """
    return self.rid

  def setId(self, id):
    """
      set the id
    """
    self.id = id

  def getId(self):
    """
      get the id
    """
    return self.id

  def setPartialXML(self, xml):
    """
    Set the partial string we will have to
    deliver in the future
    """
322
    #LOG('Subscriber.setPartialXML before',0,'partial_xml: %s' % str(self.partial_xml))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
323
    self.partial_xml = xml
324
    #LOG('Subscriber.setPartialXML after',0,'partial_xml: %s' % str(self.partial_xml))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
325 326 327 328 329 330

  def getPartialXML(self):
    """
    Set the partial string we will have to
    deliver in the future
    """
331
    #LOG('Subscriber.getPartialXML',0,'partial_xml: %s' % str(self.partial_xml))
Jean-Paul Smets's avatar
Jean-Paul Smets committed
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365
    return self.partial_xml

  def getAction(self):
    """
    Return the actual action for a partial synchronization
    """
    return self.action

  def setAction(self, action):
    """
    Return the actual action for a partial synchronization
    """
    self.action = action

  def getConflictList(self):
    """
    Return the actual action for a partial synchronization
    """
    conflict_list = []
    if len(self.conflict_list)>0:
      for conflict in self.conflict_list:
        conflict_list += [conflict]
    return conflict_list

  def resetConflictList(self):
    """
    Return the actual action for a partial synchronization
    """
    self.conflict_list = PersistentMapping()

  def setConflictList(self, conflict_list):
    """
    Return the actual action for a partial synchronization
    """
366
    LOG('setConflictList, list',0,conflict_list)
Sebastien Robin's avatar
Sebastien Robin committed
367
    if conflict_list is None or conflict_list==[]:
Jean-Paul Smets's avatar
Jean-Paul Smets committed
368 369
      self.resetConflictList()
    else:
Sebastien Robin's avatar
Sebastien Robin committed
370
      #new_conflict_list = []
371 372
      # If two conflicts are on the same objects, then
      # we join them, so we have a conflict with many xupdate
Sebastien Robin's avatar
Sebastien Robin committed
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399
#       for conflict in conflict_list:
#         found = None
#         for n_conflict in new_conflict_list:
#           if n_conflict.getObjectPath() == conflict.getObjectPath():
#             found = n_conflict
#         LOG('setConflictList, found',0,found)
#         if found == None:
#           new_conflict_list += [conflict]
#         else:
#           n_conflict.setXupdate(conflict.getXupdateList())
      #self.conflict_list = new_conflict_list
      self.conflict_list = conflict_list

  def delConflict(self, conflict):
    """
    Return the actual action for a partial synchronization
    """
    LOG('delConflict, conflict',0,conflict)
    conflict_list = []
    for c in self.getConflictList():
      LOG('delConflict, c==conflict',0,c==aq_base(conflict))
      if c != aq_base(conflict):
        conflict_list += [c]
    if conflict_list != []:
      self.setConflictList(conflict_list)
    else:
      self.resetConflictList()
Jean-Paul Smets's avatar
Jean-Paul Smets committed
400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469

class Subscription(SyncCode):
  """
    Subscription hold the definition of a master ODB
    from/to which a selection of objects will be synchronised

    Subscription defined by::

    publication_url -- a URI to a publication

    subsribtion_url -- URL of ourselves

    destination_path -- the place where objects are stored

    query   -- a query which defines a local set of documents which
           are going to be synchronised

    xml_mapping -- a PageTemplate to map documents to XML

    Subscription also holds private data to manage
    the synchronisation. We choose to keep an MD5 value for
    all documents which belong to the synchronisation process::

    signatures -- a dictionnary which contains the signature
           of documents at the time they were synchronized

    session_id -- it defines the id of the session
         with the server.

    last_anchor - it defines the id of the last synchronisation

    next_anchor - it defines the id of the current synchronisation

  """

  signatures = PersistentMapping()

  # Constructor
  def __init__(self, id, publication_url, subscription_url, destination_path, query, xml_mapping):
    """
      We need to create a dictionnary of
      signatures of documents which belong to the synchronisation
      process
    """
    self.id = id
    self.publication_url = (publication_url)
    self.subscription_url = str(subscription_url)
    self.destination_path = str(destination_path)
    self.query = query
    self.xml_mapping = xml_mapping
    self.anchor = None
    self.session_id = 0
    self.signatures = PersistentMapping()
    self.last_anchor = '00000000T000000Z'
    self.next_anchor = '00000000T000000Z'
    self.domain_type = self.SUB
    #self.signatures = PersitentMapping()

  # Accessors
  def getRemoteId(self, id, path=None):
    """
      Returns the remote id from a know local id
      Returns None if...
      path allows to implement recursive sync
    """
    pass

  def getSynchronizationType(self, default=None):
    """
    """
470 471 472 473 474 475 476
    # XXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    # XXX for debugging only, to be removed
    dict_sign = {}
    for object_id in self.signatures.keys():
      dict_sign[object_id] = self.signatures[object_id].getStatus()
    LOG('getSignature',0,'signatures_status: %s' % str(dict_sign))
    # XXXXXXXXXXXXXXXXXXXXXXXXXXXXX
Jean-Paul Smets's avatar
Jean-Paul Smets committed
477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699
    code = self.SLOW_SYNC
    if len(self.signatures.keys()) > 0:
      code = self.TWO_WAY
    if default is not None:
      code = default
    LOG('Subscription',0,'getSynchronizationType keys: %s' % str(self.signatures.keys()))
    LOG('Subscription',0,'getSynchronizationType: %s' % code)
    return code

  def getLocalId(self, rid, path=None):
    """
      Returns the local id from a know remote id
      Returns None if...
    """
    pass

  def getId(self):
    """
      return the ID
    """
    return self.id

  def setId(self, id):
    """
      set the ID
    """
    self.id = id

  def getQuery(self):
    """
      return the query
    """
    return self.query

  def setQuery(self, query):
    """
      set the query
    """
    self.query = query

  def getPublicationUrl(self):
    """
      return the publication url
    """
    return self.publication_url

  def getLocalUrl(self):
    """
      return the publication url
    """
    return self.publication_url

  def setPublicationUrl(self, publication_url):
    """
      return the publication url
    """
    self.publication_url = publication_url

  def getXML_Mapping(self):
    """
      return the xml mapping
    """
    return self.xml_mapping

  def setXML_Mapping(self, xml_mapping):
    """
      return the xml mapping
    """
    self.xml_mapping = xml_mapping

  def getSubscriptionUrl(self):
    """
      return the subscription url
    """
    return self.subscription_url

  def setSubscriptionUrl(self, subscription_url):
    """
      set the subscription url
    """
    self.subscription_url = subscription_url

  def getDestinationPath(self):
    """
      return the destination path
    """
    return self.destination_path

  def setDestinationPath(self, destination_path):
    """
      set the destination path
    """
    self.destination_path = destination_path

  def getSubscription(self):
    """
      return the current subscription
    """
    return self

  def getSessionId(self):
    """
      return the session id
    """
    self.session_id += 1
    return self.session_id

  def getLastAnchor(self):
    """
      return the id of the last synchronisation
    """
    return self.last_anchor

  def getNextAnchor(self):
    """
      return the id of the current synchronisation
    """
    return self.next_anchor

  def setLastAnchor(self, last_anchor):
    """
      set the value last anchor
    """
    self.last_anchor = last_anchor

  def setNextAnchor(self, next_anchor):
    """
      set the value next anchor
    """
    # We store the old next anchor as the new last one
    self.last_anchor = self.next_anchor
    self.next_anchor = next_anchor

  def NewAnchor(self):
    """
      set a new anchor
    """
    self.last_anchor = self.next_anchor
    self.next_anchor = strftime("%Y%m%dT%H%M%SZ", gmtime())

  def resetAnchors(self):
    """
      reset both last and next anchors
    """
    self.last_anchor = self.NULL_ANCHOR
    self.next_anchor = self.NULL_ANCHOR

  def addSignature(self, signature):
    """
      add a Signature to the subscription
    """
    self.signatures[signature.id] = signature

  def delSignature(self, id):
    """
      add a Signature to the subscription
    """
    del self.signatures[id]

  def getSignature(self, id):
    """
      add a Signature to the subscription
    """
    # This is just a test XXX To be removed
    #dict = {}
    #for key in self.signatures.keys():
    #  dict[key]=self.signatures[key].getPartialXML()
    #LOG('Subscription',0,'dict: %s' % str(dict))
    if self.signatures.has_key(id):
      return self.signatures[id]
    return None

  def getSignatureList(self):
    """
      add a Signature to the subscription
    """
    signature_list = []
    for key in self.signatures.keys():
      signature_list += [self.signatures[key]]
    return signature_list

  def hasSignature(self, id):
    """
      Check if there's a signature with this uid
    """
    LOG('Subscription',0,'keys: %s' % str(self.signatures.keys()))
    return self.signatures.has_key(id)

  def resetAllSignatures(self):
    """
      Reset all signatures
    """
    self.signatures = PersistentMapping()

  def getIdList(self):
    """
    Returns the list of ids from signature
    """
    return self.signatures.keys()

  def getConflictList(self):
    """
    Return the list of all conflicts from all signatures
    """
    conflict_list = []
    for signature in self.getSignatureList():
      conflict_list += signature.getConflictList()
    return conflict_list

  def startSynchronization(self):
    """
    Set the status of every object as NOT_SYNCHRONIZED
    """
    # XXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    # XXX for debugging only, to be removed
    dict_sign = {}
    for object_id in self.signatures.keys():
      dict_sign[object_id] = self.signatures[object_id].getStatus()
    LOG('startSynchronization',0,'signatures_status: %s' % str(dict_sign))
    # XXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    for object_id in self.signatures.keys():
      # Change the status only if we are not in a conflict mode
      if not(self.signatures[object_id].getStatus() in (self.CONFLICT,self.PUB_CONFLICT_MERGE,
700
                                                        self.PUB_CONFLICT_CLIENT_WIN)):
Jean-Paul Smets's avatar
Jean-Paul Smets committed
701 702 703
        self.signatures[object_id].setStatus(self.NOT_SYNCHRONIZED)
        self.signatures[object_id].setPartialXML(None)
        self.signatures[object_id].setTempXML(None)