diff --git a/product/ERP5SyncML/Conduit/VCardConduit.py b/product/ERP5SyncML/Conduit/VCardConduit.py new file mode 100755 index 0000000000000000000000000000000000000000..059476433717a56f9d6c91cab26ac4f3bb30fab1 --- /dev/null +++ b/product/ERP5SyncML/Conduit/VCardConduit.py @@ -0,0 +1,231 @@ +############################################################################## +# +# Copyright (c) 2002 Nexedi SARL and Contributors. All Rights Reserved. +# Fabien Morin <fabien.morin@gmail.com> +# +# WARNING: This program as such is intended to be used by professional +# programmers who take the whole responsability of assessing all potential +# consequences resulting from its eventual inadequacies and bugs +# End users who are looking for a ready-to-use solution with commercial +# garantees and support are strongly adviced to contract a Free Software +# Service Company +# +# This program is Free Software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +############################################################################## + +from Products.ERP5SyncML.Conduit.ERP5Conduit import ERP5Conduit +from AccessControl import ClassSecurityInfo +from Products.ERP5Type import Permissions +from Products.ERP5Type.Utils import convertToUpperCase +from Products.CMFCore.utils import getToolByName +from Products.ERP5SyncML.SyncCode import SyncCode +from Products.ERP5SyncML.Subscription import Subscription +from Acquisition import aq_base, aq_inner, aq_chain, aq_acquire +from ZODB.POSException import ConflictError + +from zLOG import LOG + +class VCardConduit(ERP5Conduit, SyncCode): + """ + A conduit is in charge to read data from a particular structure, + and then to save this data in another structure. + + VCardConduit is a peace of code to update VCards from text stream + """ + + + # Declarative security + security = ClassSecurityInfo() + + security.declareProtected(Permissions.AccessContentsInformation,'__init__') + def __init__(self): + self.args = {} + + + security.declareProtected(Permissions.ModifyPortalContent, 'addVCard') + def addNode(self, xml=None, object=None, previous_xml=None, + object_id=None, sub_object=None, force=0, simulate=0, **kw): + """ + add a new person corresponding to the vcard + if the person already exist, she's updated + """ + LOG('VCardConduit',0,'addNode, object=%s, object_id=%s, sub_object:%s, \ + xml:\n%s' % (str(object), str(object_id), str(sub_object), xml)) + portal_type = 'Person' #the VCard can just use Person + if sub_object is None: + + new_object = ERP5Conduit.constructContent(self, object, object_id , + None, portal_type) + else: #if the object exist, it juste must be update + new_object=sub_object + LOG('addNode', 0, 'new_object:%s, sub_object:%s' % (new_object, sub_object)) + self.updateNode(xml=xml, + object=new_object, + force=force, + simulate=simulate, + **kw) + #in a first time, conflict are not used + return {'conflict_list':None, 'object': new_object} + + def deleteNode(self, xml=None, object=None, object_id=None, force=None, + simulate=0, **kw): + """ + A node is deleted + """ + LOG('deleteNode :', 0, 'object:%s, object_id:%s' % (str(object), str(object_id))) + conflict_list = [] + try: + object._delObject(object_id) + except (AttributeError, KeyError): + LOG('VCardConduit',0,'deleteNode, Unable to delete: %s' % str(object_id)) + return conflict_list + + def updateNode(self, xml=None, object=None, previous_xml=None, force=0, + simulate=0, **kw): + """ + A node is updated + """ + LOG('updateNode :',0, 'xml:%s, object:%s, previous_xml:%s, force:%s,simulate:%s, kw:%s' % (xml, object, previous_xml, force, simulate, kw)) + vcard_dict = self.vcard2Dict(xml) + object.edit(**vcard_dict) + return [] + + def getCapabilitiesCTTypeList(self): + """ + return the a list of CTType capabilities supported + """ + return self.MEDIA_TYPE.values() + + def getCapabilitiesVerCTList(self, capabilities_ct_type): + """ + return a list of version of the CTType supported + """ + #add here the other version supported + verCTTypeList = {} + verCTTypeList[self.MEDIA_TYPE['TEXT_VCARD']]=('3.0',) + verCTTypeList[self.MEDIA_TYPE['TEXT_XVCARD']]=('2.1',) + return verCTTypeList[capabilities_ct_type] + + + def getPreferedCapabilitieVerCT(self): + """ + return the prefered capabilitie VerCT + """ + prefered_version = '2.1' + return prefered_version + + def getPreferedCapabilitieCTType(self): + """ + return the prefered capabilitie VerCT + """ + prefered_type = self.MEDIA_TYPE['TEXT_XVCARD'] + return prefered_type + + def getGidFromXML(self, vcard): + """ + return the Gid composed of FirstName and LastName + """ + vcard_dict = self.vcard2Dict(vcard) + gid_from_vcard = [] + gid_from_vcard.append(vcard_dict['first_name']) + gid_from_vcard.append(' ') + gid_from_vcard.append(vcard_dict['last_name']) + gid_from_vcard = ''.join(gid_from_vcard) + return gid_from_vcard + + def changePropertyEncoding(self, property_parameters_list, + property_value_list): + """ + if there is a property 'ENCODING', change the string encoding to utf-8 + """ + encoding='' + + for item in property_parameters_list : + if item.has_key('ENCODING'): + encoding = item['ENCODING'] + + property_value_list_well_incoded=[] + if encoding == 'QUOTED-PRINTABLE': + import mimify + for property_value in property_value_list: + property_value = mimify.mime_decode(property_value) + property_value_list_well_incoded.append(property_value) + #elif ... put here the other encodings + else: + property_value_list_well_incoded=property_value_list + + return property_value_list_well_incoded + + def vcard2Dict(self, vcard): + """ + transalate the vcard to a dict understandable by erp5 like + {'fisrt_name':'MORIN', 'last_name':'Fabien'} + """ + #LOG('vcard =',0,vcard) + convert_dict = {} + convert_dict['FN'] = 'first_name' + convert_dict['N'] = 'last_name' + convert_dict['TEL'] = 'default_telephone_text' + edit_dict = {} + vcard_list = vcard.split('\n') + for vcard_line in vcard_list: + if ':' in vcard_line: + property, property_value = vcard_line.split(':') + property_value_list=property_value.split(';') + property_parameters_list = [] + property_name = '' + if ';' in property: + property_list = property.split(';') + property_name = property_list[0] #the property name is the 1st element + if len(property_list) > 1 and property_list[1] != '': + property_parameters_list = property_list[1:len(property_list)] + tmp = [] + for property_parameter in property_parameters_list: + if '=' in property_parameter: + property_parameter_name, property_parameter_value = \ + property_parameter.split('=') + else: + property_parameter_name = property_parameter + property_parameter_value = None + tmp.append({property_parameter_name:property_parameter_value}) + property_parameters_list = tmp + #now property_parameters_list looks like : + # [{'ENCODING':'QUOTED-PRINTABLE'}, {'CHARSET':'UTF-8'}] + + property_value_list = \ + self.changePropertyEncoding(property_parameters_list, + property_value_list) + + else: + property_name=property + if type(property_name) is type(u'a'): + property_name = property_name.encode('utf-8') + + tmp=[] + for property_value in property_value_list: + if type(property_value) is type(u'a'): + property_value = property_value.encode('utf-8') + tmp.append(property_value) + property_value_list=tmp + if property_name in convert_dict.keys(): + if property_name == 'N' and len(property_value_list) > 1: + edit_dict[convert_dict['N']]=property_value_list[0] + edit_dict[convert_dict['FN']]=property_value_list[1] + else: + edit_dict[convert_dict[property_name]]=property_value_list[0] + #LOG('edit_dict =',0,edit_dict) + return edit_dict + diff --git a/product/ERP5SyncML/tests/testERP5SyncMLVCard.py b/product/ERP5SyncML/tests/testERP5SyncMLVCard.py new file mode 100755 index 0000000000000000000000000000000000000000..f8c89c6ff01cd9962b4b2e4d06bd8005b5dedebc --- /dev/null +++ b/product/ERP5SyncML/tests/testERP5SyncMLVCard.py @@ -0,0 +1,227 @@ +############################################################################## +# vim: set fileencoding=utf-8 +# +# Copyright (c) 2007 Nexedi SARL and Contributors. All Rights Reserved. +# Fabien Morin <fabien.morin@gmail.com> +# +# WARNING: This program as such is intended to be used by professional +# programmers who take the whole responsability of assessing all potential +# consequences resulting from its eventual inadequacies and bugs +# End users who are looking for a ready-to-use solution with commercial +# garantees and support are strongly adviced to contract a Free Software +# Service Company +# +# This program is Free Software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. +# +############################################################################## + + +import os, sys +if __name__ == '__main__': + execfile(os.path.join(sys.path[0], 'framework.py')) + +# Needed in order to have a log file inside the current folder +os.environ['EVENT_LOG_FILE'] = os.path.join(os.getcwd(), 'zLOG.log') +os.environ['EVENT_LOG_SEVERITY'] = '-300' + +from Testing import ZopeTestCase +from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase +from AccessControl.SecurityManagement import newSecurityManager +from Products.ERP5SyncML.Conduit.VCardConduit import VCardConduit +from Products.ERP5SyncML.SyncCode import SyncCode +from testERP5SyncML import TestERP5SyncMLMixin +from zLOG import LOG + +class TestERP5SyncMLVCard(TestERP5SyncMLMixin, ERP5TypeTestCase): + + run_all_test = True + + def getBusinessTemplateList(self): + """ + Return the list of business templates. + + the business template sync_crm give 3 folders: + /person_server + /person_client1 : empty + /person_client2 : empty + """ + return ('erp5_base','fabien_bt') + + def test_01_AddVCardPublication(self, quiet=0, run=run_all_test): + if not run: return + if not quiet: + ZopeTestCase._print('\nTest Add a VCard Publication ') + LOG('Testing... ',0,'test_36_AddVCardPublication') + portal_id = self.getPortalName() + portal_sync = self.getSynchronizationTool() + portal_sync.manage_addPublication(self.pub_id, self.publication_url, + '/%s/person_server' % portal_id, 'Person', 'objectValues', + 'Person_exportAsVCard', 'VCardConduit', '', 'generateNewId', + 'getId', SyncCode.MEDIA_TYPE['TEXT_VCARD']) + pub = portal_sync.getPublication(self.pub_id) + self.failUnless(pub is not None) + + def test_02_AddVCardSubscription1(self, quiet=0, run=run_all_test): + if not run: return + if not quiet: + ZopeTestCase._print('\nTest Add First VCard Subscription ') + LOG('Testing... ',0,'test_02_AddVCardSubscription1') + portal_id = self.getPortalId() + portal_sync = self.getSynchronizationTool() + portal_sync.manage_addSubscription(self.sub_id1, self.publication_url, + self.subscription_url1, '/%s/person_client1' % portal_id, + 'Person', 'Person', 'objectValues', 'Person_exportAsVCard', + 'VCardConduit', '', 'generateNewId', 'getId', + SyncCode.MEDIA_TYPE['TEXT_VCARD']) + sub = portal_sync.getSubscription(self.sub_id1) + self.failUnless(sub is not None) + + def test_03_AddVCardSubscription2(self, quiet=0, run=run_all_test): + if not run: return + if not quiet: + ZopeTestCase._print('\nTest Add Second VCard Subscription ') + LOG('Testing... ',0,'test_03_AddVCardSubscription2') + portal_id = self.getPortalId() + portal_sync = self.getSynchronizationTool() + portal_sync.manage_addSubscription(self.sub_id2, self.publication_url, + self.subscription_url2, '/%s/person_client2' % portal_id, + 'Person', 'Person', 'objectValues', 'Person_exportAsVCard', + 'VCardConduit', '', 'generateNewId', 'getId', + SyncCode.MEDIA_TYPE['TEXT_VCARD']) + sub = portal_sync.getSubscription(self.sub_id2) + self.failUnless(sub is not None) + + def test_04_FirstVCardSynchronization(self, quiet=0, run=run_all_test): + # We will try to populate the folder person_client1 + # with the data form person_server + if not run: return + if not quiet: + ZopeTestCase._print('\nTest First VCard Synchronization ') + LOG('Testing... ',0,'test_04_FirstVCardSynchronization') + self.login() + self.test_01_AddVCardPublication(quiet=True, run=True) + self.test_02_AddVCardSubscription1(quiet=True, run=True) + self.test_03_AddVCardSubscription2(quiet=True, run=True) + nb_person = self.populatePersonServer(quiet=1,run=1) + portal_sync = self.getSynchronizationTool() + for sub in portal_sync.getSubscriptionList(): + self.assertEquals(sub.getSynchronizationType(),SyncCode.SLOW_SYNC) + # Synchronize the first client + nb_message1 = self.synchronize(self.sub_id1) + for sub in portal_sync.getSubscriptionList(): + if sub.getTitle() == self.sub_id1: + self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY) + else: + self.assertEquals(sub.getSynchronizationType(),SyncCode.SLOW_SYNC) + self.failUnless(nb_message1==self.nb_message_first_synchronization) + # Synchronize the second client + nb_message2 = self.synchronize(self.sub_id2) + for sub in portal_sync.getSubscriptionList(): + self.assertEquals(sub.getSynchronizationType(),SyncCode.TWO_WAY) + self.failUnless(nb_message2==self.nb_message_first_synchronization) + self.checkFirstSynchronization(id='1', nb_person=nb_person) + + def test_05_basicVCardSynchronization(self, quiet=0, run=run_all_test): + """ + synchronize two ERP5Sites using VCards + """ + + if not run: return + if not quiet: + ZopeTestCase._print('\nTest Basic VCard Synchronization') + LOG('Testing... ',0,'test_05_basicVCardSynchronization') + + self.test_04_FirstVCardSynchronization(quiet=True, run=True) + + + portal_sync = self.getSynchronizationTool() + person_server = self.getPersonServer() + person1_s = person_server._getOb(self.id1) + person_client1 = self.getPersonClient1() + person1_c = person_client1._getOb('1') #The new person is added with a + #generate id (the first is 1) + + # try to synchronize + kw = {'first_name':self.first_name3,'last_name':self.last_name3} + person1_c.edit(**kw) + #before synchornization, First and Last name souldn't be the same + self.verifyFirstNameAndLastNameAreNotSynchronized(self.first_name3, + self.last_name3, person1_s, person1_c) + self.synchronize(self.sub_id1) + #after synchronization, a new person is create on the server + person1_s = person_server._getOb('1') #The new person is added on the + #serverwith a generate id (the first is 1) + + #after the synchro, the client and server should be synchronized + self.checkSynchronizationStateIsSynchronized() + self.verifyFirstNameAndLastNameAreSynchronized(self.first_name3, + self.last_name3, person1_s, person1_c) + + def test_05_verifyNoDuplicateDataWhenAdding(self, quiet=0, run=run_all_test): + """ + this test permit to verify that if the server already have the person, + he don't add it a second time + """ + if not run: return + if not quiet: + ZopeTestCase._print('\nTest No Duplicate Data When Adding') + LOG('Testing... ',0,'test_05_verifyNoDuplicateDataWhenAdding') + self.test_04_FirstVCardSynchronization(quiet=True, run=True) + portal_sync = self.getSynchronizationTool() + sub1 = portal_sync.getSubscription(self.sub_id1) + sub2 = portal_sync.getSubscription(self.sub_id2) + pub = portal_sync.getPublication(self.pub_id) + + person_server = self.getPersonServer() + person1_s = person_server._getOb(self.id1) + person_client1 = self.getPersonClient1() + person1_c = person_client1._getOb('1') #The new person is added with a + #generate id (the first is 1) + + # try to synchronize + kw = {'first_name':self.first_name3,'last_name':self.last_name3} + person1_c.edit(**kw) + person1_s.edit(**kw) #the same person is added on client AND server + #before synchornization, First and Last name souldn't be the same + self.verifyFirstNameAndLastNameAreSynchronized(self.first_name3, + self.last_name3, person1_s, person1_c) + nb_person_serv_before_sync = len(pub.getObjectList()) + self.synchronize(self.sub_id1) + #after synchronization, no new person is created on server because it + #already have this person + #person1_s = person_server._getOb('1') #The new person is added on the + #serverwith a generate id (the first is 1) + + #after the synchro, the client and server should be synchronized + self.checkSynchronizationStateIsSynchronized() + self.verifyFirstNameAndLastNameAreSynchronized(self.first_name3, + self.last_name3, person1_s, person1_c) + + nb_person_serv_after_sync = len(pub.getObjectList()) + #the number of person on server before and after the synchronization should + #be the same + nb_person_serv_after_sync = len(pub.getObjectList()) + self.failUnless(nb_person_serv_after_sync==nb_person_serv_before_sync) + + + +if __name__ == '__main__': + framework() +else: + import unittest + def test_suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(TestERP5SyncMLVCard)) + return suite