# -*- coding: utf-8 -*- ############################################################################## # # Copyright (c) 2009 Nexedi SA and Contributors. All Rights Reserved. # Nicolas Delaby <nicolas@nexedi.com> # # WARNING: This program as such is intended to be used by professional # programmers who take the whole responsability of assessing all potential # consequences resulting from its eventual inadequacies and bugs # End users who are looking for a ready-to-use solution with commercial # garantees and support are strongly adviced to contract a Free Software # Service Company # # This program is Free Software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # ############################################################################## import unittest import time import transaction from Testing import ZopeTestCase from testDms import TestDocumentMixin from Products.ERP5Type.tests.utils import FileUpload from Products.ERP5Type.tests.utils import DummyLocalizer from AccessControl.SecurityManagement import newSecurityManager from Products.ERP5Type.Cache import DEFAULT_CACHE_SCOPE from zLOG import LOG import os TEST_FILES_HOME = os.path.join(os.path.dirname(__file__), 'test_document') FILE_NAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})" REFERENCE_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})(-(?P<language>[a-z]{2}))?(-(?P<version>[0-9]{3}))?" def makeFilePath(name): return os.path.join(os.path.dirname(__file__), 'test_document', name) def makeFileUpload(name, as_name=None): if as_name is None: as_name = name path = makeFilePath(name) return FileUpload(path, as_name) class TestDocumentConversionCache(TestDocumentMixin): """ Test basic document - related operations """ failed_format_list = (#'fodt', #'bib', #'writer.xhtml', #'pdb', #'psw', #'tex', #'wiki.txt', #'uot', #'2003.doc.xml', #'docbook.xml', ) def getTitle(self): return "OOo Conversion Cache" def beforeTearDown(self): """ Do some stuff after each test: - clear document module """ self.clearDocumentModule() ## tests def test_01_HasEverything(self): """ Standard test to make sure we have everything we need - all the tools etc """ print '\nTest Has Everything ' self.assertNotEqual(self.getCategoryTool(), None) self.assertNotEqual(self.getSimulationTool(), None) self.assertNotEqual(self.getTypeTool(), None) self.assertNotEqual(self.getSQLConnection(), None) self.assertNotEqual(self.getCatalogTool(), None) self.assertNotEqual(self.getWorkflowTool(), None) def test_01_PersistentCacheConversion(self): """ Test Conversion Cache mechanism """ print '\nPersistent Cache Conversion' filename = 'TEST-en-002.doc' file = makeFileUpload(filename) document = self.portal.portal_contributions.newContent(file=file) transaction.commit() self.tic() document_url = document.getRelativeUrl() document = self.portal.restrictedTraverse(document_url) format_list = [format for format in document.getTargetFormatList() if format not in self.failed_format_list] if not format_list: self.fail('Target format list is empty') #Test Conversion Cache for format in format_list: document.convert(format=format) transaction.commit() self.assertTrue(document.hasConversion(format=format), 'Cache Storage failed for %s' % (format)) self.assertTrue(document.getConversionSize(format=format)) document.edit(title='Foo') transaction.commit() #Test Cache is cleared for format in format_list: self.assertFalse(document.hasConversion(format=format), 'Cache Storage failed for %s' % (format)) self.assertEquals(document.getConversionSize(format=format), 0) document.edit(title='Bar') transaction.commit() #Test Conversion Cache after editing for format in format_list: document.convert(format=format) transaction.commit() self.assertTrue(document.hasConversion(format=format), 'Cache Storage failed for %s' % (format)) self.assertTrue(document.getConversionSize(format=format)) def test_02_VolatileCacheConversionOfTempObject(self): """ Test Conversion Cache mechanism """ print '\nVolatile Cache Conversion of temp objects' filename = 'TEST-en-002.doc' file = makeFileUpload(filename) document = self.portal.portal_contributions.newContent(file=file, temp_object=1) document.uploadFile() document.processFile() document.convertToBaseFormat() format_list = [format for format in document.getTargetFormatList() if format not in self.failed_format_list] if not format_list: self.fail('Target format list is empty') #Test Conversion Cache for format in format_list: document.convert(format=format) transaction.commit() self.assertTrue(document.hasConversion(format=format), 'Cache Storage failed for %s' % (format)) self.assertTrue(document.getConversionSize(format=format)) document.edit(title='Foo') transaction.commit() #Test Cache is cleared for format in format_list: self.assertFalse(document.hasConversion(format=format), 'Cache Storage failed for %s' % (format)) self.assertEqual(document.getConversionSize(format=format), 0) document.edit(title='Bar') transaction.commit() #Test Conversion Cache after editing for format in format_list: document.convert(format=format) transaction.commit() self.assertTrue(document.hasConversion(format=format), 'Cache Storage failed for %s' % (format)) self.assertTrue(document.getConversionSize(format=format)) def test_03_CacheConversionOfTempObjectIsNotMixed(self): """ Test Conversion Cache mechanism """ print '\nCache Conversion of temp objects is not mixed' filename1 = 'TEST-en-002.doc' filename2 = 'TEST-en-002.odt' file1 = makeFileUpload(filename1) file2 = makeFileUpload(filename2) document1 = self.portal.portal_contributions.newContent(file=file1, temp_object=1) document1.uploadFile() document1.processFile() document1.convertToBaseFormat() document2 = self.portal.portal_contributions.newContent(file=file2, temp_object=1) document2.uploadFile() document2.processFile() document2.convertToBaseFormat() format = 'pdf' document1.convert(format=format) document2.convert(format=format) self.assertNotEqual(document1.getConversion(format=format), document2.getConversion(format=format)) def test_04_PersistentCacheConversionWithFlare(self): """ Test Conversion Cache mechanism """ print '\nPersistent Cache Conversion with Flare' default_pref = self.portal.portal_preferences.default_site_preference default_pref.setPreferredConversionCacheFactory('dms_cache_factory') #old preferred value is still cached self.portal.portal_caches.clearAllCache() transaction.commit() self.tic() filename = 'TEST-en-002.doc' file = makeFileUpload(filename) document = self.portal.portal_contributions.newContent(file=file) transaction.commit() self.tic() document_url = document.getRelativeUrl() document = self.portal.restrictedTraverse(document_url) format_list = [format for format in document.getTargetFormatList()\ if format not in self.failed_format_list] if not format_list: self.fail('Target format list is empty') #Test Conversion Cache for format in format_list: document.convert(format=format) self.assertTrue(document.hasConversion(format=format), 'Cache Storage failed for %s' % (format)) self.assertTrue(document.getConversionSize(format=format)) document.edit(title='Foo') transaction.commit() #Test Cache is cleared for format in format_list: self.assertFalse(document.hasConversion(format=format), 'Cache Storage failed for %s' % (format)) self.assertEqual(document.getConversionSize(format=format), 0) document.edit(title='Bar') transaction.commit() #Test Conversion Cache after editing for format in format_list: document.convert(format=format) self.assertTrue(document.hasConversion(format=format), 'Cache Storage failed for %s' % (format)) self.assertTrue(document.getConversionSize(format=format)) def test_05_checksum_conversion(self): """ Test Conversion Cache return expected value with checksum """ print '\nCheck checksum in Conversion' filename = 'TEST-en-002.doc' file = makeFileUpload(filename) document = self.portal.portal_contributions.newContent(file=file) transaction.commit() self.tic() document_url = document.getRelativeUrl() document = self.portal.restrictedTraverse(document_url) kw = {'format': 'html'} #Generate one conversion document.convert(**kw) cache_id = '%s%s' % (document._getCacheKey(), document.generateCacheId(**kw)) cache_factory = document._getCacheFactory() for cache_plugin in cache_factory.getCachePluginList(): cache_entry = cache_plugin.get(cache_id, DEFAULT_CACHE_SCOPE) md5sum, mime, data = cache_entry.getValue() #get data from cache self.assertTrue(md5sum) self.assertTrue(mime) self.assertTrue(data) #Change md5 manualy cache_plugin.set(cache_id, DEFAULT_CACHE_SCOPE, ('Anything which is not md5', mime, data), 0, 0) self.assertRaises(KeyError, document.getConversion, format='html') def test_06_check_md5_is_updated(self): """ Check that md5 checksum is well updated when upload a file """ print '\nCheck checksum is updated' filename = 'TEST-en-002.doc' file = makeFileUpload(filename) document = self.portal.portal_contributions.newContent(file=file) transaction.commit() self.tic() document_url = document.getRelativeUrl() document = self.portal.restrictedTraverse(document_url) md5sum = document.getContentMd5() self.assertTrue(md5sum) filename2 = 'TEST-en-002.odt' file2 = makeFileUpload(filename2) document.edit(file=file2) self.assertNotEqual(md5sum, document.getContentMd5()) def test_07_check_cache_key_is_escaped(self): """ Check that key (based on path of document) support unauthorised chars """ print '\nCheck key (based on path) support unauthorised chars' default_pref = self.portal.portal_preferences.default_site_preference default_pref.setPreferredConversionCacheFactory('dms_cache_factory') #old preferred value is still cached self.portal.portal_caches.clearAllCache() transaction.commit() self.tic() filename = 'TEST-en-002.doc' file = makeFileUpload(filename) document_id = 'an id with spaces' document = self.portal.portal_contributions.newContent(id=document_id, file=file) transaction.commit() self.tic() document_url = document.getRelativeUrl() document = self.portal.restrictedTraverse(document_url) self.assertEquals(document.getId(), document_id) document.convert(format='txt') self.assertTrue(document.getConversion(format='txt')) def test_08_check_conversion_cache_with_portal_document_type_list(self): """Check cache conversion for all Portal Document Types """ print '\nCheck cache conversion for all Portal Document Types' portal_type_list = list(self.portal.getPortalDocumentTypeList()) if 'File' in portal_type_list: #File conversion is not implemented portal_type_list.remove('File') data_mapping = {'Drawing': 'TEST-en-002.sxd', 'Text': 'TEST-en-002.doc', 'Spreadsheet': 'TEST-en-002.sxc', 'Presentation': 'TEST-en-002.sxi', 'Web Page': 'TEST-en-002.html', 'Image': 'TEST-en-002.gif', #'File': 'TEST-en-002.rtf', 'PDF': 'TEST-en-002.pdf'} #Check that all portal_types are handled by test self.assertEqual(len(portal_type_list), len([pt for pt in portal_type_list if pt in data_mapping])) for portal_type in portal_type_list: module = self.portal.getDefaultModule(portal_type=portal_type) upload_file = makeFileUpload(data_mapping[portal_type]) document = module.newContent(portal_type=portal_type) document.edit(file=upload_file) transaction.commit() self.tic() document.convert(format='txt') document.convert(format='html') self.assertTrue(document.getConversion(format='txt')) self.assertTrue(document.getConversion(format='html')) def test_suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(TestDocumentConversionCache)) return suite # vim: syntax=python shiftwidth=2