Commit 2d2c38f9 authored by Aurel's avatar Aurel

implement better deletion mecanism & remove workflow transition overhead

parent 1b4cd9c7
......@@ -28,7 +28,6 @@
##############################################################################
from base64 import b16encode, b16decode
from warnings import warn
from logging import getLogger
from urlparse import urlparse
from lxml import etree
......@@ -66,6 +65,8 @@ syncml_logger = getLogger('ERP5SyncML')
MAX_OBJECT_PER_MESSAGE = 300
RETRO_COMPATIBLE = True
_MARKER = []
class SyncMLSubscription(XMLObject):
"""
......@@ -99,6 +100,82 @@ class SyncMLSubscription(XMLObject):
self.logout()
self._edit(authenticated_user=None)
security.declarePrivate('getAndIndex')
def getAndIndex(self, callback, method_kw, activate_kw, **kw):
"""
This methods is called by the asynchronous engine to index source
data in sql table
callback : method to call in activity
method_kw : callback's parameters
activate_kw : activity parameters to pass to activate call
kw : any parameter getAndActivate can required if it calls itself
"""
search_kw = dict(kw)
packet_size = search_kw.pop('packet_size', 30)
limit = packet_size * search_kw.pop('activity_count', 100)
try:
r = self.getDocumentIdList(limit=limit, **search_kw) # It is assumed that
# the result is sorted
except TypeError:
if not RETRO_COMPATIBLE:
raise
else:
syncml_logger.warning("Script %s does not accept paramaters limit=%s kw=%s" %
(self.getListMethodId(), limit, search_kw,))
r = self.getDocumentList() # It is assumed that
# the result is sorted
result_count = len(r)
generated_other_activity = False
if result_count:
syncml_logger.info("getAndIndex : got %d, %r result, limit = %d, packet %d" %
(result_count, r, limit, packet_size))
if result_count == limit:
# Recursive call to prevent too many activity generation
next_kw = dict(activate_kw, priority=1+activate_kw.get('priority', 1))
kw["min_id"] = r[-1].getId()
syncml_logger.info("--> calling getAndActivate in activity, min = %s" %
(kw["min_id"],))
self.activate(**next_kw).getAndActivate(
callback, method_kw, activate_kw, **kw)
generated_other_activity = True
r = [x.getPath() for x in r]
activate = self.getPortalObject().portal_synchronizations.activate
callback_method = getattr(activate(**activate_kw), callback)
if generated_other_activity:
for i in xrange(0, result_count, packet_size):
syncml_logger.info("-- getAndIndex : recursive call, generating for %s"
% (r[i:i+packet_size],))
callback_method(path_list=r[i:i+packet_size],
activate_kw=activate_kw,
**method_kw)
else:
if result_count > packet_size:
for i in xrange(0, result_count-packet_size, packet_size):
syncml_logger.info("-- getAndIndex : i %s, call, generating for %s : %s" %
(i, r[i:i+packet_size], activate_kw))
callback_method(path_list=r[i:i+packet_size],
**method_kw)
final_min = i + packet_size
else:
final_min = 0
syncml_logger.info("---- getAndIndex : final call for %s %s : %s" \
%(final_min, r[final_min:], activate_kw))
callback_method(path_list=r[final_min:],
activate_kw=activate_kw,
**method_kw)
return result_count
security.declarePrivate('getSearchableSourcePath')
def getSearchableSourcePath(self):
"""
Return the path of the subscription that will be used in sql table
_ char must be escaped because of the LIKE behaviour
"""
return "%s%%" % (self.getSourceValue().getPath().replace("_","\_"),)
security.declarePrivate('getAndActivate')
def getAndActivate(self, callback, method_kw, activate_kw, **kw):
"""
......@@ -121,59 +198,81 @@ class SyncMLSubscription(XMLObject):
search_kw = dict(kw)
packet_size = search_kw.pop('packet_size', 30)
limit = packet_size * search_kw.pop('activity_count', 100)
try:
r = self.getDocumentIdList(limit=limit, **search_kw) # It is assumed that
# the result is sorted
except TypeError:
syncml_logger.warning("Script %s does not accept paramaters limit=%s kw=%s" %
(self.getListMethodId(), limit, search_kw,))
r = self.getDocumentList() # It is assumed that
# the result is sorted
# We must know if we have a lower limit or not to propagate
if not kw.has_key("strict_min_gid"):
first_call = True
else:
first_call = False
search_kw.update({"stict_min_gid" : None,
"min_gid" : None,
"max_gid" : None,
"limit" : limit,
"path" : self.getSearchableSourcePath()})
r = [x.gid for x in self.z_get_syncml_gid_list(**search_kw)]
result_count = len(r)
generated_other_activity = False
if result_count:
syncml_logger.debug("getAndActivate : got %d result, limit = %d, packet %d" %
(result_count, limit, packet_size))
syncml_logger.info("getAndActivate : got %d result" % (result_count,))
if result_count == limit:
# Recursive call to prevent too many activity generation
next_kw = dict(activate_kw, priority=1+activate_kw.get('priority', 1))
kw["min_id"] = r[-1].getId()
syncml_logger.debug("--> calling getAndActivate in activity, min = %s" %
(kw["min_id"],))
kw["strict_min_gid"] = r[-1]
syncml_logger.info("--> calling getAndActivate in activity, min = %s" %
(kw["min_gid"],))
self.activate(**next_kw).getAndActivate(
callback, method_kw, activate_kw, **kw)
generated_other_activity = True
r = [x.getId() for x in r]
message_id_list = self.getNextMessageIdList(id_count=result_count)
# XXX maybe (result_count / packet_size) + 1 instead of result_count
message_id_list.reverse() # We pop each id in the following loop
activate = self.getPortalObject().portal_synchronizations.activate
callback_method = getattr(activate(**activate_kw), callback)
if generated_other_activity:
# XXX Can be factorized with following code
# upper_limit of xrange + some check ???
for i in xrange(0, result_count, packet_size):
syncml_logger.debug("-- getAndActivate : recursive call, generating for %s"
% (r[i:i+packet_size],))
callback_method(id_list=r[i:i+packet_size],
syncml_logger.info("-- getAndActivate : recursive call")
if first_call:
min_gid = None
first_call = False
else:
min_gid = r[i]
callback_method(min_gid=min_gid,
max_gid=r[i+packet_size],
message_id=message_id_list.pop(),
activate_kw=activate_kw,
**method_kw)
activate_kw=activate_kw)
else:
i = 0
for i in xrange(0, result_count-packet_size, packet_size):
syncml_logger.debug("-- getAndActivate : call, generating for %s : %s" %
(r[i:i+packet_size], activate_kw))
callback_method(id_list=r[i:i+packet_size],
message_id=message_id_list.pop(),
activate_kw=activate_kw,
**method_kw)
# Final activity must be executed after all other
syncml_logger.debug("---- getAndActivate : final call for %s : %s" %(r[i+packet_size:], activate_kw))
callback_method(id_list=r[i+packet_size:], # XXX Has to be unit tested
# with mock object
if result_count > packet_size:
for i in xrange(0, result_count-packet_size, packet_size):
syncml_logger.info("-- getAndActivate call")
if first_call:
min_gid = None
first_call = False
else:
min_gid = r[i]
callback_method(min_gid=min_gid,
max_gid=r[i+packet_size],
message_id=message_id_list.pop(),
activate_kw=activate_kw)
final_min = i + packet_size
else:
final_min = 0
# Final activity must be tell there is no upper limit
# XXX maybe re-put here the final tag of message to avoid empty message
syncml_logger.info("---- getAndActivate : final call")
if first_call:
min_gid = None
else:
min_gid = r[final_min]
callback_method(min_gid=min_gid,
max_gid=None, # No limit when last call
message_id=message_id_list.pop(),
activate_kw=activate_kw,
**method_kw)
activate_kw=activate_kw)
return result_count
security.declarePrivate('sendMessage')
......@@ -207,7 +306,6 @@ class SyncMLSubscription(XMLObject):
sync_id=self.getDestinationReference(),
content_type=self.getContentType())
def _loginUser(self, user_id=None):
"""
Log in with the user provided or defined on self
......@@ -217,7 +315,11 @@ class SyncMLSubscription(XMLObject):
if user_id:
# TODO: make it work for users existing anywhere
user_folder = self.getPortalObject().acl_users
user = user_folder.getUserById(user_id).__of__(user_folder) # __of__ might got AttributeError
try:
user = user_folder.getUserById(user_id).__of__(user_folder) # __of__ might got AttributeError
except AttributeError:
raise ValueError("User %s cannot be found in user folder, \
synchronization cannot work with this kind of user" % (user_id,))
if user is None:
raise ValueError("User %s cannot be found in user folder, \
synchronization cannot work with this kind of user" % (user_id,))
......@@ -229,13 +331,17 @@ class SyncMLSubscription(XMLObject):
% (self.getRelativeUrl()))
# XXX To be done later
def _applyAddCommand(self,):
security.declarePrivate('applyActionList')
def applyActionList(self, syncml_request, syncml_response, simulate=False):
"""
Apply the add command received, when document already exits, we
do a kind of "Replace" command instead
Browse the list of sync command received, apply them and generate answer
"""
pass
for action in syncml_request.sync_command_list:
self.applySyncCommand(
action=action,
request_message_id=syncml_request.header["message_id"],
syncml_response=syncml_response,
simulate=simulate)
security.declarePrivate('applySyncCommand')
def applySyncCommand(self, action, request_message_id, syncml_response,
......@@ -255,6 +361,7 @@ class SyncMLSubscription(XMLObject):
# First retrieve the GID of the object we want to modify
gid = action["source"] or action["target"]
# Retrieve the signature for the current GID
path_list = []
signature = self.getSignatureFromGid(gid)
if syncml_response is not None: # No response to send when no signature to create
document = self.getDocumentFromGid(gid)
......@@ -265,7 +372,7 @@ class SyncMLSubscription(XMLObject):
portal_type='SyncML Signature',
id=gid,
)
syncml_logger.debug("Created a signature for %s - document : %s"
syncml_logger.info("Created a signature for %s - document : %s"
% (signature.getPath(), document))
if document is not None:
signature.setReference(document.getPath())
......@@ -273,7 +380,7 @@ class SyncMLSubscription(XMLObject):
elif signature.getValidationState() == 'synchronized':
# Reset status of signature synchronization
signature.drift()
path_list.append(signature.getPath())
force = signature.isForce() # XXX-must check the use of this later
else:
force = True # Always erease data in this mode
......@@ -291,7 +398,6 @@ class SyncMLSubscription(XMLObject):
if not action['more_data']:
# This is the last chunk of a partial xml
# or this is just an entire data chunk
if signature and signature.hasPartialData():
# Build data with already stored data
signature.appendPartialData(incoming_data)
......@@ -304,7 +410,7 @@ class SyncMLSubscription(XMLObject):
if document is None:
# This is the default behaviour when getting an "Add" command
# we create new document from the received data
syncml_logger.debug("Calling addNode with no previous document found")
syncml_logger.info("Calling addNode with no previous document found")
add_data = conduit.addNode(xml=incoming_data,
object=destination,
signature=signature,
......@@ -390,16 +496,22 @@ class SyncMLSubscription(XMLObject):
elif action['command'] == 'Delete':
status_code="success"
document = self.getDocumentFromGid(signature.getId())
syncml_logger.info("Deleting signature %s & doc %s" %(signature.getPath(),
document.getPath()))
path_list.remove(signature.getPath())
if document is not None:
# XXX Can't we get conflict ?
# XXX Review the code to prevent retrieving document
conduit.deleteNode(xml=incoming_data,
object=destination,
object_id=document.getId())
# Delete signature
self._delObject(gid)
else:
syncml_logger.error("Document with gid is already deleted"
syncml_logger.error("Document with gid %s is already deleted"
% (gid,))
self.z_delete_data_from_path(path="%s" %(signature.getPath(),))
else:
raise ValueError("Unknown command %s" %(action['command'],))
......@@ -432,7 +544,7 @@ class SyncMLSubscription(XMLObject):
message_ref=request_message_id)
else: # We want to retrieve more data
syncml_logger.debug("we need to retrieve more data for %s" % (signature,))
syncml_logger.info("we need to retrieve more data for %s" % (signature,))
if signature.getValidationState() != 'partial':
signature.changeToPartial()
signature.appendPartialData(incoming_data)
......@@ -455,46 +567,8 @@ class SyncMLSubscription(XMLObject):
source=self.getSourceReference(),
last_anchor=self.getLastAnchor(),
next_anchor=self.getNextAnchor())
security.declarePrivate('applyActionList')
def applyActionList(self, syncml_request, syncml_response, simulate=False):
"""
Browse the list of sync command received, apply them and generate answer
"""
for action in syncml_request.sync_command_list:
self.applySyncCommand(
action=action,
request_message_id=syncml_request.header["message_id"],
syncml_response=syncml_response,
simulate=simulate)
def _getDeletedData(self, syncml_response=None):
"""
Add delete command to syncml resposne
"""
# XXX property must be renamed to activity_enabled
if self.getIsActivityEnabled():
self.recurseCallMethod(
method_id="getId",
min_depth=1,
max_depth=1,
activate_kw={'priority': ACTIVITY_PRIORITY,
'group_method_id' : "%s/checkAndSendDeleteMessage"
% (self.getRelativeUrl()),
'tag' : "%s_delete" % self.getRelativeUrl()})
self.activate(after_tag="%s_delete" %(self.getRelativeUrl()),
priority=ACTIVITY_PRIORITY+1,
)._sendFinalMessage()
else:
# XXX not efficient at all but must not be used (former way)
syncml_logger.warning("Using non-efficient way to retrieve delete object on %s"
% (self.getRelativeUrl(),))
id_list = [x.getId() for x in self.objectValues() if \
x.getValidationState() == "not_synchronized"]
for gid in id_list:
syncml_response.addDeleteCommand(gid=gid)
# Index signature with their new value
self.SQLCatalog_indexSyncMLDocumentList(path_list)
def _sendFinalMessage(self):
"""
......@@ -516,78 +590,78 @@ class SyncMLSubscription(XMLObject):
'priority' :ACTIVITY_PRIORITY + 1,
'tag' : "%s_delete" %(self.getRelativeUrl(),)
}
syncml_logger.warning("Sending final message for modificationson on %s"
syncml_logger.info("Sending final message for modificationson on %s"
% (self.getRelativeUrl(),))
self.activate(**final_activate_kw).sendMessage(xml=str(syncml_response))
def checkAndSendDeleteMessage(self, message_list):
"""
This is a group method that will be invoked for a message list
It check signature synchronization state to know which one has
to be deleted and send the syncml message
"""
syncml_logger.warning("Checking deleted signature on %s"
% (self.getRelativeUrl(),))
to_delete_id_list = []
for m in message_list:
if m[0].getValidationState() == "not_synchronized":
to_delete_id_list.append(m[0].getId())
syncml_logger.warning("\tdeleted object is %s"
% (to_delete_id_list,))
if len(to_delete_id_list):
syncml_response = SyncMLResponse()
syncml_response.addHeader(
session_id=self.getSessionId(),
message_id=self.getNextMessageId(),
target=self.getUrlString(),
source=self.getSubscriptionUrlString())
syncml_response.addBody()
for gid in to_delete_id_list:
syncml_response.addDeleteCommand(gid=gid)
syncml_logger.info("%s sendDeleteCommand for %s"
% (self.getRelativeUrl(), to_delete_id_list))
self.activate(activity="SQLQueue",
tag="%s_delete" % (self.getRelativeUrl(),),
priority=ACTIVITY_PRIORITY).sendMessage(xml=str(syncml_response))
def _getSyncMLData(self, syncml_response, id_list=None):
def getSearchablePath(self):
return "%s%%" %(self.getPath().replace('_', '\_'),)
def _getSyncMLData(self, syncml_response, min_gid, max_gid):
"""
XXX Comment to be fixed
Compare data from source with data stored in signature from previous
synchronization. If there is any change, add command into the syncml
message
syncml_response : SyncML message to fill with command
min_gid = the lower limit for browsing data
max_gid = the upper limit for browsing data
"""
if not id_list:
syncml_logger.warning("Non optimal call to _getSyncMLData, no id list provided : %r" %(id_list))
else:
syncml_logger.info("getSyncMLData, id list provided %s" % (id_list,))
syncml_logger.info("getSyncMLData, min %s - max %r" % (min_gid, max_gid,))
conduit = self.getConduit()
finished = True
conduit = self.getConduit()
if isinstance(conduit, basestring):
conduit = getConduitByName(conduit)
try:
object_list = self.getDocumentList(id_list=id_list)
except TypeError:
# Old style script
warn("Script %s does not accept id_list paramater" %
(self.getListMethodId(),), DeprecationWarning)
object_list = self.getDocumentList()
# Compare gid list to know which data were deleted
source_gid_list = [x.gid for x in self.z_get_syncml_gid_list(
strict_min_gid=None,
min_gid=min_gid,
max_gid=max_gid,
path =self.getSearchableSourcePath(),
limit=None)]
src = self.z_get_syncml_gid_list(
src__=1,
strict_min_gid=None,
min_gid=min_gid,
max_gid=max_gid,
path = self.getSearchablePath(),
limit=None)
syncml_logger.info("source %r" % (src,))
signature_list = [x.gid for x in self.z_get_syncml_gid_list(
strict_min_gid=None,
min_gid=min_gid,
max_gid=max_gid,
path = self.getSearchablePath(),
limit=None)]
signature_set = set(signature_list)
source_gid_set = set(source_gid_list) # XXX get it with mysql
deleted_signature_set = signature_set - source_gid_set
syncml_logger.info("\t---> delete signature are %r from %r - %r"
% (deleted_signature_set, signature_set, source_gid_set))
for gid in deleted_signature_set:
syncml_response.addDeleteCommand(gid=gid)
loop = 0
traverse = self.getPortalObject().restrictedTraverse
object_list = [traverse(x.path) for x in self.z_get_syncml_path_list(
min_gid=min_gid,
max_gid=max_gid,
path=self.getSearchableSourcePath())]
alert_code = self.getSyncmlAlertCode()
sync_all = alert_code in ("refresh_from_client_only", "slow_sync")
# XXX Quick & dirty hack to prevent signature creation, this must be defined
# on pub/sub instead
create_signature = alert_code != "refresh_from_client_only"
if not len(object_list) and id_list:
syncml_logger.warning("No object retrieved althoud id_list (%s) is provided"
% (id_list))
if not len(object_list) and (min_gid or max_gid):
raise ValueError("No object retrieved althoud min/max gid (%s/%s) is provided"
% (min_gid, max_gid))
path_list = []
for result in object_list:
object_path = result.getPath()
# if loop >= max_range:
......@@ -606,10 +680,14 @@ class SyncMLSubscription(XMLObject):
# no object due to a too small limit
signature = self.getSignatureFromGid(gid)
more_data = False
if signature:
syncml_logger.debug("signature is %s = %s" %(signature.getRelativeUrl(),
signature.getValidationState()))
# For the case it was never synchronized, we have to send everything
if not signature or sync_all:
# First time we send this object or the synchronization more required
# to send every data as it was never synchronized before
# Either it is the first time we get this object
# either the synchronization process required
# to send every data again as if it was never done before
document_data = conduit.getXMLFromObjectWithId(
# XXX To be renamed (getDocumentData) independant from format
document,
......@@ -628,7 +706,7 @@ class SyncMLSubscription(XMLObject):
syncml_logger.debug("Created a signature %s for gid = %s, path %s"
% (signature.getPath(), gid, document.getPath()))
if len(document_data) > MAX_LEN:
syncml_logger.debug("data too big, sending multiple message")
syncml_logger.info("data too big, sending multiple message")
more_data = True
finished = False
document_data, rest_string = cutXML(document_data, MAX_LEN)
......@@ -652,7 +730,7 @@ class SyncMLSubscription(XMLObject):
more_data=more_data,
media_type=conduit.getContentType())
elif signature.getValidationState() in ('not_synchronized',
elif signature.getValidationState() in ('not_synchronized', 'synchronized',
'conflict_resolved_with_merge'):
# We don't have synchronized this object yet but it has a signature
xml_object = conduit.getXMLFromObjectWithId(document,
......@@ -668,7 +746,8 @@ class SyncMLSubscription(XMLObject):
source_ref=signature.getId(),
sync_code='conflict_resolved_with_merge',
command='Replace')
syncml_logger.debug("\tMD5 is %s for %s" %((signature.checkMD5(xml_object)),
signature.getReference()))
if not signature.checkMD5(xml_object):
# MD5 checksum tell there is a modification of the object
if conduit.getContentType() != 'text/xml':
......@@ -687,15 +766,16 @@ class SyncMLSubscription(XMLObject):
# MD5 Checksum can detect changes like <lang/> != <lang></lang>
# but Diff generator will return no diff for it
# in this case, no need to send diff
signature.synchronize()
if signature.getValidationState() != "synchronized":
signature.synchronize()
syncml_logger.debug("signature %s is synchronized"
% (signature.getRelativeUrl(),))
path_list.append(signature.getPath())
continue
# Split data if necessary
if len(data_diff) > MAX_LEN:
syncml_logger.debug("data too big, sending multiple messages")
syncml_logger.info("data too big, sending multiple messages")
more_data = True
finished = False
data_diff, rest_string = cutXML(data_diff, MAX_LEN)
......@@ -703,9 +783,8 @@ class SyncMLSubscription(XMLObject):
signature.setPartialAction(REPLACE_ACTION)
if signature.getValidationState() != 'partial':
signature.changeToPartial()
syncml_logger.debug("signature %s is partial"
syncml_logger.info("signature %s is partial"
% (signature.getRelativeUrl(),))
else:
# Store the new representation of the document
# It will be copy to "data" property once synchronization
......@@ -715,7 +794,6 @@ class SyncMLSubscription(XMLObject):
syncml_logger.debug("signature %s is syncing"
% (signature.getRelativeUrl(),))
# Generate the command
syncml_logger.debug("will send Replace command with %s"
% (data_diff,))
......@@ -754,6 +832,7 @@ class SyncMLSubscription(XMLObject):
signature.synchronize()
syncml_logger.debug("signature %s is synchronized"
% (signature.getRelativeUrl(),))
elif signature.getValidationState() == 'partial':
# Case of partially sent data
xml_string = signature.getPartialData()
......@@ -777,11 +856,14 @@ class SyncMLSubscription(XMLObject):
signature.doSync()
syncml_logger.debug("signature %s is syncing"
% (signature.getRelativeUrl(),))
elif signature.getValidationState() in ('syncing', 'synchronized'):
elif signature.getValidationState() in ('syncing'):
raise ValueError("Must not get signature in %s state here, signature is %s"
% (signature.getValidationState(),
signature.getPath(),))
if signature:
path_list.append(signature.getPath())
if not more_data:
pass
else:
......@@ -790,8 +872,9 @@ class SyncMLSubscription(XMLObject):
else:
syncml_logger.warning("Package is going to be splitted")
break
loop += 1
syncml_logger.debug("_getSyncMLData end with finished %s"
self.SQLCatalog_indexSyncMLDocumentList(path_list)
syncml_logger.info("_getSyncMLData end with finished %s"
% (finished,))
return finished
......@@ -900,7 +983,10 @@ class SyncMLSubscription(XMLObject):
try:
result_list = query_method(context_document=self, **kw)
except TypeError:
result_list = query_method(**kw)
if not RETRO_COMPATIBLE:
raise
else:
result_list = query_method(**kw)
else:
raise KeyError, 'This Subscriber %s provide no list method:%r'\
% (self.getPath(), list_method_id)
......@@ -1028,22 +1114,43 @@ class SyncMLSubscription(XMLObject):
conflict_list.extend(signature.getConflictList())
return conflict_list
security.declareProtected(Permissions.ModifyPortalContent,
'initialiseSynchronization')
def initialiseSynchronization(self):
'indexSourceData')
def indexSourceData(self, client=False):
"""
Set the status of every signature as not_synchronized
Index source data into mysql for ensemble comparison
This depends on synchronization type
"""
if self.getIsActivityEnabled():
self.getAndActivateResetSignature()
else:
for signature in self.contentValues(portal_type='SyncML Signature'):
# Change the status only if we are not in a conflict mode
if signature.getValidationState() not in (
'conflict',
'conflict_resolved_with_merge',
'conflict_resolved_with_client_command_winning'):
signature.reset()
if (client and self.getSyncmlAlertCode() not in \
("one_way_from_server", "refresh_from_server_only")) or \
(not client and self.getSyncmlAlertCode() not in \
("one_way_from_client", "refresh_from_client_only")):
portal = self.getPortalObject()
pref = portal.portal_preferences
# First we must unindex everything
portal.z_unindex_syncml_data(path=self.getSearchableSourcePath())
if self.getIsActivityEnabled():
activate_kw = {
'activity' : 'SQLQueue',
'tag' : self.getRelativeUrl(),
'priority' :ACTIVITY_PRIORITY
}
self.getAndIndex(
callback="SQLCatalog_indexSyncMLDocumentList",
method_kw={'subscription_path' : self.getRelativeUrl()},
activate_kw=activate_kw,
packet_size=pref.getPreferredDocumentRetrievedPerActivityCount(),
activity_count=pref.getPreferredRetrievalActivityCount(),
)
else:
r = [x.getPath() for x in self.getDocumentList()]
syncml_logger.info("indexing data from %s : %r" %(self.getPath(), r))
portal.SQLCatalog_indexSyncMLDocumentList(
path_list=r[:],
subscription_path=self.getRelativeUrl())
security.declareProtected(Permissions.ModifyPortalContent,
'getAndActivateResetSignature')
......
......@@ -78,7 +78,10 @@ class SyncMLAsynchronousEngine(EngineMixin):
syncml_response = self._generateBaseResponse(subscription)
syncml_response.addFinal()
else:
self.runGetAndActivate(subscription=subscription, tag=tag)
# Make sure it is launched after indexation step
self.runGetAndActivate(subscription=subscription, tag=tag,
after_method_id=("getAndIndex",
"SQLCatalog_indexSyncMLSignatureList"))
syncml_logger.info("X-> Client is sendind modification in activities")
# As we generated all activities to send data at once, process must not
# go back here, go into processing state thus status will be applied and
......@@ -163,10 +166,10 @@ class SyncMLAsynchronousEngine(EngineMixin):
# Apply command & send modifications
# Apply status about object send & synchronized if any
sync_status_counter = self._readStatusList(syncml_request, subscriber,
self._readStatusList(syncml_request, subscriber,
generate_alert=True)
syncml_response = None
tag = subscription_path = subscriber.getRelativeUrl()
tag = subscriber.getRelativeUrl()
after_method_id = None
if subscriber.getSynchronizationState() == "sending_modifications":
if syncml_request.isFinal:
......@@ -202,15 +205,13 @@ class SyncMLAsynchronousEngine(EngineMixin):
if syncml_request.isFinal:
# Server then sends its modifications
subscriber.sendModifications()
# Now that everything is ok, init sync information
if subscriber.getSyncmlAlertCode() not in ("one_way_from_client",
"refresh_from_client_only"):
# Reset signature only if we have to check modifications on server side
subscriber.initialiseSynchronization()
# Run indexation only once client have sent its modifications
subscriber.indexSourceData()
# Start to send modification only once we have processed
# all message from client
after_method_id='processServerSynchronization',
after_method_id=('processServerSynchronization',
'SQLCatalog_indexSyncMLDocumentList')
# XXX after tag might also be required to make sure all data are indexed
tag = (tag, "%s_reset" % subscriber.getPath(),)
# Do not continue in elif, as sending modifications is done in the same
# package as sending notifications
......@@ -242,10 +243,9 @@ class SyncMLAsynchronousEngine(EngineMixin):
after_tag=tag).sendMessage(
xml=str(syncml_response))
def runGetAndActivate(self, subscription, tag, after_method_id=None):
"""
Generate tag and method parameter and call the getAndActivate method
Launch the browsing of GID that will call the generation of syncml commands
"""
activate_kw = {
'activity' : 'SQLQueue',
......@@ -253,20 +253,16 @@ class SyncMLAsynchronousEngine(EngineMixin):
'tag' :tag,
'priority' :ACTIVITY_PRIORITY
}
method_kw = {
'subscription_path' : subscription.getRelativeUrl(),
}
pref = getSite().portal_preferences
subscription.getAndActivate(
callback="sendSyncCommand",
method_kw=method_kw,
activate_kw=activate_kw,
packet_size=pref.getPreferredDocumentRetrievedPerActivityCount(),
activity_count=pref.getPreferredRetrievalActivityCount(),
)
# Then get deleted document
# this will send also the final message of this sync part
subscription.activate(after_tag=tag)._getDeletedData()
# then send the final message of this sync part
subscription.activate(after_tag=tag,
priority=ACTIVITY_PRIORITY+1)._sendFinalMessage()
return True
......
......@@ -145,9 +145,10 @@ class EngineMixin(object):
signature = domain.getSignatureFromGid(object_gid)
if status['status_code'] == resolveSyncmlStatusCode('success'):
if signature:
domain.z_delete_data_from_path(path=signature.getPath())
domain._delObject(signature.getId())
else:
raise ValueError("Found no signature to delete")
raise ValueError("Found no signature to delete for gid %s" %(object_gid,))
else:
raise ValueError("Unknown status code : %r" % (status['status_code'],))
syncml_logger.error("\tObject deleted %s" %
......@@ -191,10 +192,8 @@ class EngineMixin(object):
if subscription.getAuthenticationState() != 'logged_in':
# Workflow action
subscription.login()
if subscription.getSyncmlAlertCode() not in ("one_way_from_server",
"refresh_from_server_only"):
# Reset signature only if client send its modification to server
subscription.initialiseSynchronization()
subscription.indexSourceData(client=True)
# Create the package 1
syncml_response = SyncMLResponse()
......@@ -301,7 +300,9 @@ class EngineMixin(object):
'one_way_from_server',
'refresh_from_client_only',
'one_way_from_client'):
# XXX Why re-editing here ?
# Make sure we update configuration based on publication data
# so that manual edition is propagated
# XXX Must check all properties that must be setted
subscriber.setXmlBindingGeneratorMethodId(
publication.getXmlBindingGeneratorMethodId())
subscriber.setConduitModuleId(publication.getConduitModuleId())
......
......@@ -29,6 +29,7 @@ from logging import getLogger
from Products.ERP5SyncML.Engine.EngineMixin import EngineMixin
from Products.ERP5SyncML.SyncMLConstant import SynchronizationError
from Products.ERP5.ERP5Site import getSite
syncml_logger = getLogger('ERP5SyncML')
......@@ -81,13 +82,12 @@ class SyncMLSynchronousEngine(EngineMixin):
# We only get data from server
finished = True
else:
finished = subscription._getSyncMLData(
syncml_response=syncml_response,
)
finished = subscription._getSyncMLData(syncml_response=syncml_response,
min_gid=None, max_gid=None)
syncml_logger.info("-> Client sendind modification, finished %s" % (finished,))
if finished:
# Add deleted objets
subscription._getDeletedData(syncml_response=syncml_response)
#subscription._getDeletedData(syncml_response=syncml_response)
# Notify that all modifications were sent
syncml_response.addFinal()
# Will then start processing sync commands from server
......@@ -191,10 +191,8 @@ class SyncMLSynchronousEngine(EngineMixin):
if syncml_request.isFinal:
# Server will now send its modifications
subscriber.sendModifications()
if subscriber.getSyncmlAlertCode() not in ("one_way_from_client",
"refresh_from_client_only"):
# Reset signature only if we have to check modifications on server side
subscriber.initialiseSynchronization()
# Run indexation only once client has sent its modifications
subscriber.indexSourceData()
# Do not continue in elif, as sending modifications is done in the same
# package as sending notifications
......@@ -205,11 +203,11 @@ class SyncMLSynchronousEngine(EngineMixin):
# We only get data from client
finished = True
else:
finished = subscriber._getSyncMLData(
syncml_response=syncml_response)
finished = subscriber._getSyncMLData(syncml_response=syncml_response,
min_gid=None, max_gid=None)
syncml_logger.info("-> Server sendind data, finished %s" % (finished,))
if finished:
subscriber._getDeletedData(syncml_response=syncml_response)
#subscriber._getDeletedData(syncml_response=syncml_response)
syncml_response.addFinal()
subscriber.waitNotifications()
# Do not go into finished here as we must wait for
......
......@@ -399,13 +399,14 @@ class SynchronizationTool(BaseTool):
return str(syncml_response)
# As engines are not zodb objects, the tool acts as a placeholder for methods
# that need to be called in activities
def applySyncCommand(self, subscription_path, response_message_id,
activate_kw, **kw):
"""
This methods is intented to be called by asynchronous engine in activity to
apply sync commands for a subset of data
As engines are not zodb object, the tool acts as a placeholder for method
that need to be called in activities
"""
subscription = self.restrictedTraverse(subscription_path)
assert subscription is not None, "Impossible to find subscription %s" \
......@@ -437,13 +438,11 @@ class SynchronizationTool(BaseTool):
def sendSyncCommand(self, id_list, message_id, subscription_path,
activate_kw, is_final_message=False):
def sendSyncCommand(self, gid_list, message_id, subscription_path,
activate_kw, first_call=False, last_call=False):
"""
This methods is intented to be called by asynchronous engine in activity to
send sync commands for a subset of data
As engines are not zodb object, the tool acts as a placeholder for method
that need to be called in activities
"""
subscription = self.restrictedTraverse(subscription_path)
assert subscription is not None, "Impossible to find subscription %s" \
......@@ -457,17 +456,14 @@ class SynchronizationTool(BaseTool):
source=subscription.getSubscriptionUrlString())
syncml_response.addBody()
subscription._getSyncMLData(
syncml_response=syncml_response,
id_list=id_list,
gid_list=gid_list,
first_call=first_call,
last_call=last_call,
)
if is_final_message:
# Notify that all modifications were sent
syncml_response.addFinal()
# Send the message in activity to prevent recomputing data in case of
# Send the message in activity to prevent recomputation of data in case of
# transport failure
# activate_kw["group_method_id"] = None
# activate_kw["group_method_cost"] = .05
......
......@@ -44,6 +44,7 @@ from Products.ERP5SyncML.Document import SyncMLSubscription
from Products.ERP5SyncML.tests.testERP5SyncMLMixin import TestERP5SyncMLMixin \
as TestMixin
from Products.ERP5Type.tests.backportUnittest import expectedFailure
from _mysql_exceptions import OperationalError
class TestERP5SyncMLMixin(TestMixin):
......@@ -98,6 +99,8 @@ class TestERP5SyncMLMixin(TestMixin):
def afterSetUp(self):
"""Setup."""
self.login()
self.portal.z_drop_syncml()
self.portal.z_create_syncml()
# This test creates Person inside Person, so we modifiy type information to
# allow anything inside Person (we'll cleanup on teardown)
self.getTypesTool().getTypeInfo('Person').filter_content_types = 0
......@@ -228,6 +231,7 @@ class TestERP5SyncMLMixin(TestMixin):
result = portal_sync.processClientSynchronization(subscription.getPath())
self.tic()
nb_message += 1
self.tic()
return nb_message
def synchronizeWithBrokenMessage(self, id):
......@@ -948,6 +952,8 @@ return [context[%r]]
person_server.manage_delObjects(self.id1)
person_client1 = self.getPersonClient1()
person_client1.manage_delObjects(self.id2)
# import ipdb
# ipdb.set_trace()
self.synchronize(self.sub_id1)
self.synchronize(self.sub_id2)
self.checkSynchronizationStateIsSynchronized()
......@@ -1543,7 +1549,7 @@ return [context[%r]]
self.assertEquals(client_person.getLastName(), self.last_name1)
# reset for refresh sync
# after synchronize, the client object retrieve value of server
# after synchronization, the client retrieves value from server
self.resetSignaturePublicationAndSubscription()
self.synchronize(self.sub_id1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment