From afd84b017420d17eebf5759d89f740acc458af8a Mon Sep 17 00:00:00 2001 From: Nicolas Delaby <nicolas@nexedi.com> Date: Wed, 1 Dec 2010 14:55:46 +0000 Subject: [PATCH] s/file_name/filename/ s/source_reference/filename/ update tests Add test to check Hackability of ContributionTool (Everything can be managed by IDiscoverable API) git-svn-id: https://svn.erp5.org/repos/public/erp5/trunk@40972 20353a03-c40f-0410-a6d1-a30d3c3de9de --- product/ERP5OOo/Document/OOoDocument.py | 24 +- product/ERP5OOo/tests/testDms.py | 116 +-- product/ERP5OOo/tests/testIngestion.py | 897 ++++++++++++++---- .../ERP5OOo/tests/testOOoConversionCache.py | 10 +- 4 files changed, 778 insertions(+), 269 deletions(-) diff --git a/product/ERP5OOo/Document/OOoDocument.py b/product/ERP5OOo/Document/OOoDocument.py index 94aa646637c..7b635d96a8d 100644 --- a/product/ERP5OOo/Document/OOoDocument.py +++ b/product/ERP5OOo/Document/OOoDocument.py @@ -383,7 +383,7 @@ class OOoDocument(OOoDocumentExtensibleTraversableMixin, BaseConvertableFileMixi temp_image = self.portal_contributions.newContent( portal_type='Image', file=cStringIO.StringIO(), - file_name=self.getId(), + filename=self.getId(), temp_object=1) temp_image._setData(data) # we care for first page only but as well for image quality @@ -420,23 +420,23 @@ class OOoDocument(OOoDocumentExtensibleTraversableMixin, BaseConvertableFileMixi else: must_close = 0 for f in zip_file.infolist(): - file_name = f.filename - document = self.get(file_name, None) + filename = f.filename + document = self.get(filename, None) if document is not None: - self.manage_delObjects([file_name]) # For compatibility with old implementation - if file_name.endswith('html'): + self.manage_delObjects([filename]) # For compatibility with old implementation + if filename.endswith('html'): mime = 'text/html' # call portal_transforms to strip HTML in safe mode portal = self.getPortalObject() transform_tool = getToolByName(portal, 'portal_transforms') data = transform_tool.convertToData('text/x-html-safe', - zip_file.read(file_name), + zip_file.read(filename), object=self, context=self, mimetype=mime) else: - mime = guess_content_type(file_name)[0] - data = Pdata(zip_file.read(file_name)) - self.setConversion(data, mime=mime, format=EMBEDDED_FORMAT, file_name=file_name) + mime = guess_content_type(filename)[0] + data = Pdata(zip_file.read(filename)) + self.setConversion(data, mime=mime, format=EMBEDDED_FORMAT, filename=filename) if must_close: zip_file.close() archive_file.close() @@ -450,7 +450,7 @@ class OOoDocument(OOoDocumentExtensibleTraversableMixin, BaseConvertableFileMixi """ server_proxy = OOoServerProxy(self) response_code, response_dict, response_message = server_proxy.run_convert( - self.getSourceReference() or self.getId(), + self.getFilename() or self.getId(), enc(str(self.getData())), None, None, @@ -468,9 +468,7 @@ class OOoDocument(OOoDocumentExtensibleTraversableMixin, BaseConvertableFileMixi "OOoDocument: Error converting document to base format %s:%s:" % (response_code, response_message)) - security.declareProtected(Permissions.AccessContentsInformation, - 'getContentInformation') - def getContentInformation(self): + def _getContentInformation(self): """ Returns the metadata extracted by the conversion server. diff --git a/product/ERP5OOo/tests/testDms.py b/product/ERP5OOo/tests/testDms.py index 8d05ab651ab..2802f39ad91 100644 --- a/product/ERP5OOo/tests/testDms.py +++ b/product/ERP5OOo/tests/testDms.py @@ -74,11 +74,12 @@ import difflib from AccessControl import Unauthorized from Products.ERP5Type import Permissions from Products.ERP5Type.tests.backportUnittest import expectedFailure +from Products.ERP5.Tool.ContributionTool import AlreadyIngestedUrlError QUIET = 0 TEST_FILES_HOME = os.path.join(os.path.dirname(__file__), 'test_document') -FILE_NAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})" +FILENAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})" REFERENCE_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})(-(?P<language>[a-z]{2}))?(-(?P<version>[0-9]{3}))?" def makeFilePath(name): @@ -114,7 +115,7 @@ class TestDocumentMixin(ERP5TypeTestCase): conversion_dict = _getConversionServerDict() default_pref.setPreferredOoodocServerAddress(conversion_dict['hostname']) default_pref.setPreferredOoodocServerPortNumber(conversion_dict['port']) - default_pref.setPreferredDocumentFileNameRegularExpression(FILE_NAME_REGULAR_EXPRESSION) + default_pref.setPreferredDocumentFilenameRegularExpression(FILENAME_REGULAR_EXPRESSION) default_pref.setPreferredDocumentReferenceRegularExpression(REFERENCE_REGULAR_EXPRESSION) if self.portal.portal_workflow.isTransitionPossible(default_pref, 'enable'): default_pref.enable() @@ -193,14 +194,14 @@ class TestDocument(TestDocumentMixin): ## helper methods - def createTestDocument(self, file_name=None, portal_type='Text', reference='TEST', version='002', language='en'): + def createTestDocument(self, filename=None, portal_type='Text', reference='TEST', version='002', language='en'): """ Creates a text document """ dm=self.getPortal().document_module doctext=dm.newContent(portal_type=portal_type) - if file_name is not None: - f = open(makeFilePath(file_name), 'rb') + if filename is not None: + f = open(makeFilePath(filename), 'rb') doctext.setTextContent(f.read()) f.close() doctext.setReference(reference) @@ -585,7 +586,7 @@ class TestDocument(TestDocumentMixin): # tests that owners can download OOo documents, and all headers (including # filenames) are set correctly doc = self.portal.document_module.newContent( - source_reference='test.ods', + filename='test.ods', portal_type='Spreadsheet') doc.edit(file=makeFileUpload('import_data_list.ods')) @@ -608,7 +609,7 @@ class TestDocument(TestDocumentMixin): # tests that members can download OOo documents in pdf format (at least in # published state), and all headers (including filenames) are set correctly doc = self.portal.document_module.newContent( - source_reference='test.ods', + filename='test.ods', portal_type='Spreadsheet') doc.edit(file=makeFileUpload('import.file.with.dot.in.filename.ods')) doc.publish() @@ -1276,32 +1277,28 @@ class TestDocument(TestDocumentMixin): upload_file = makeFileUpload('REF-en-001.pdf') document = self.portal.document_module.newContent(portal_type='PDF') # Here we use edit instead of setFile, - # because only edit method set filename as source_reference. + # because only edit method set filename as filename. document.edit(file=upload_file) self.assertEquals('application/pdf', document.getContentType()) - def test_Document_getStandardFileName(self): + def test_Document_getStandardFilename(self): upload_file = makeFileUpload('metadata.pdf') document = self.portal.document_module.newContent(portal_type='PDF') - # Here we use edit instead of setFile, - # because only edit method set filename as source_reference. document.edit(file=upload_file) - self.assertEquals(document.getStandardFileName(), 'metadata.pdf') - self.assertEquals(document.getStandardFileName(format='png'), + self.assertEquals(document.getStandardFilename(), 'metadata.pdf') + self.assertEquals(document.getStandardFilename(format='png'), 'metadata.png') document.setVersion('001') document.setLanguage('en') - self.assertEquals(document.getStandardFileName(), 'metadata-001-en.pdf') - self.assertEquals(document.getStandardFileName(format='png'), + self.assertEquals(document.getStandardFilename(), 'metadata-001-en.pdf') + self.assertEquals(document.getStandardFilename(format='png'), 'metadata-001-en.png') # check when format contains multiple '.' upload_file = makeFileUpload('TEST-en-003.odp') document = self.portal.document_module.newContent(portal_type='Presentation') - # Here we use edit instead of setFile, - # because only edit method set filename as source_reference. document.edit(file=upload_file) - self.assertEquals(document.getStandardFileName(), 'TEST-en-003.odp') - self.assertEquals('TEST-en-003.odg', document.getStandardFileName(format='odp.odg')) + self.assertEquals(document.getStandardFilename(), 'TEST-en-003.odp') + self.assertEquals('TEST-en-003.odg', document.getStandardFilename(format='odp.odg')) def test_CMYKImageTextContent(self): @@ -1320,14 +1317,10 @@ class TestDocument(TestDocumentMixin): self.stepTic() self.assertEquals('converted', document.getExternalProcessingState()) - # Upload different type of file inside which can not be converted to base format - upload_file = makeFileUpload('REF-en-001.pdf') - document.edit(file=upload_file) + # Delete base_data + document.edit(base_data=None) self.stepTic() - self.assertEquals('application/pdf', document.getContentType()) - self.assertEquals('conversion_failed', document.getExternalProcessingState()) - # As document is not converted, text convertion is impossible - # But document can still be retrive with portal catalog + # As document is not converted, text conversion is impossible self.assertRaises(NotConvertedError, document.asText) self.assertRaises(NotConvertedError, document.getSearchableText) self.assertEquals('This document is not converted yet.', @@ -1646,6 +1639,28 @@ document.write('<sc'+'ript type="text/javascript" src="http://somosite.bg/utb.ph self.assertTrue('AZERTYY' not in safe_html) self.assertTrue('#FFAA44' in safe_html) + @expectedFailure + def test_safeHTML_impossible_conversion(self): + """Some html are not parsable. + """ + web_page_portal_type = 'Web Page' + module = self.portal.getDefaultModule(web_page_portal_type) + web_page = module.newContent(portal_type=web_page_portal_type) + # very dirty html + html_content = """ + <html> + <body> + <p><a href="http://www.example.com/category/html/" style="font-weight: bold; color: rgb(0, 0, 0); font-size: 90.8777%; text-decoration: none;" title="catégorie how to write valid html d" alt="Diancre pas d" accord="" :="" 6="" articles="">Its french</a></p> + </body> + </html> +""" + web_page.edit(text_content=html_content) + from HTMLParser import ParserError + try: + web_page.asStrippedHTML() + except ParserError: + self.fail('Even BeautifulSoup is not able to parse such HTML') + def test_parallel_conversion(self): """Check that conversion engine is able to fill in cache without overwrite previous conversion @@ -1768,7 +1783,8 @@ document.write('<sc'+'ript type="text/javascript" src="http://somosite.bg/utb.ph upload_file = makeFileUpload('TEST-text-iso8859-1.txt') web_page = module.newContent(portal_type=web_page_portal_type, file=upload_file) - + transaction.commit() + self.tic() text_content = web_page.getTextContent() my_utf_eight_token = 'ùééà çèîà ' text_content = text_content.replace('\n', '\n%s\n' % my_utf_eight_token) @@ -1798,9 +1814,9 @@ return 1 transaction.commit() def _test_document_conversion_to_base_format_no_original_format_access(self, - portal_type, file_name): + portal_type, filename): module = self.portal.getDefaultModule(portal_type) - upload_file = makeFileUpload(file_name) + upload_file = makeFileUpload(filename) document = module.newContent(portal_type=portal_type, file=upload_file) @@ -1869,48 +1885,6 @@ return 1 self.assertTrue('Continue' in response.getBody()) self.assertTrue('Last page' in response.getBody()) - def test_contributeLink(self): - """ - Test contributing a link. - """ - portal = self.portal - kw = {'url':portal.absolute_url()} - web_page_1 = portal.Base_contribute(**kw) - self.stepTic() - self.assertTrue(web_page_1.getRevision()=='2') - - web_page_2 = portal.Base_contribute(**kw) - self.stepTic() - self.assertTrue(web_page_1==web_page_2) - self.assertTrue(web_page_2.getRevision()=='3') - - web_page_3 = portal.Base_contribute(**kw) - self.stepTic() - self.assertTrue(web_page_2==web_page_3) - self.assertTrue(web_page_3.getRevision()=='4') - - # test in synchronous mode - kw['synchronous_metadata_discovery']=True - web_page_4 = portal.Base_contribute(**kw) - self.stepTic() - self.assertTrue(web_page_3==web_page_4) - self.assertTrue(web_page_4.getRevision()=='5') - - web_page_5 = portal.Base_contribute(**kw) - self.stepTic() - self.assertTrue(web_page_4==web_page_5) - self.assertTrue(web_page_5.getRevision()=='6') - - web_page_6 = portal.Base_contribute(**kw) - self.stepTic() - self.assertTrue(web_page_5==web_page_6) - self.assertTrue(web_page_6.getRevision()=='7') - - # test contribute link is a safe html (duplicates parts of test_safeHTML_conversion) - web_page_6_entire_html = web_page_6.asEntireHTML() - self.assertTrue('<script' not in web_page_6_entire_html) - self.assertTrue('<javascript' not in web_page_6_entire_html) - def test_getTargetFormatItemList(self): """ Test getting target conversion format item list. diff --git a/product/ERP5OOo/tests/testIngestion.py b/product/ERP5OOo/tests/testIngestion.py index 00abd6aaabb..fc77069de49 100644 --- a/product/ERP5OOo/tests/testIngestion.py +++ b/product/ERP5OOo/tests/testIngestion.py @@ -40,7 +40,7 @@ from Products.ERP5Type.Utils import convertToUpperCase from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase,\ _getConversionServerDict from Products.ERP5Type.tests.Sequence import SequenceList -from Products.ERP5Type.tests.utils import FileUpload +from Products.ERP5Type.tests.utils import FileUpload, createZODBPythonScript from Products.ERP5OOo.Document.OOoDocument import ConversionError from Products.ERP5OOo.OOoUtils import OOoBuilder from zLOG import LOG, INFO, ERROR @@ -48,7 +48,7 @@ from Products.CMFCore.utils import getToolByName # test files' home TEST_FILES_HOME = os.path.join(os.path.dirname(__file__), 'test_document') -FILE_NAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z&é@{]{3,7})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})" +FILENAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z&é@{]{3,7})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})" REFERENCE_REGULAR_EXPRESSION = "(?P<reference>[A-Z&é@{]{3,7})(-(?P<language>[a-z]{2}))?(-(?P<version>[0-9]{3}))?" @@ -98,6 +98,17 @@ class TestIngestion(ERP5TypeTestCase): self.setSimulatedNotificationScript() def beforeTearDown(self): + # cleanup modules + module_id_list = """web_page_module + document_module + image_module + external_source_module + """.split() + for module_id in module_id_list: + module = self.portal[module_id] + module.manage_delObjects([id for id in module.objectIds()]) + transaction.commit() + self.tic() activity_tool = self.portal.portal_activities activity_status = set(m.processing_node < -1 for m in activity_tool.getMessageList()) @@ -106,13 +117,31 @@ class TestIngestion(ERP5TypeTestCase): else: assert not activity_status self.portal.portal_caches.clearAllCache() + # Cleanup portal_skins + script_id_list = ('Document_getPropertyDictFromContent', + 'Document_getPropertyDictFromInput', + 'Document_getPropertyDictFromFilename', + 'Document_getPropertyDictFromUserLogin', + 'Document_finishIngestion', + 'Document_getPreferredDocumentMetadataDiscoveryOrderList', + 'Text_getPropertyDictFromContent', + 'Text_getPropertyDictFromInput', + 'Text_getPropertyDictFromFilename', + 'Text_getPropertyDictFromUserLogin', + 'Text_finishIngestion', + 'Text_getPreferredDocumentMetadataDiscoveryOrderList',) + skin_tool = self.portal.portal_skins + for script_id in script_id_list: + if script_id in skin_tool.custom.objectIds(): + skin_tool.custom._delObject(script_id) + transaction.commit() def setSystemPreference(self): default_pref = self.portal.portal_preferences.default_site_preference conversion_dict = _getConversionServerDict() default_pref.setPreferredOoodocServerAddress(conversion_dict['hostname']) default_pref.setPreferredOoodocServerPortNumber(conversion_dict['port']) - default_pref.setPreferredDocumentFileNameRegularExpression(FILE_NAME_REGULAR_EXPRESSION) + default_pref.setPreferredDocumentFilenameRegularExpression(FILENAME_REGULAR_EXPRESSION) default_pref.setPreferredDocumentReferenceRegularExpression(REFERENCE_REGULAR_EXPRESSION) if default_pref.getPreferenceState() != 'global': default_pref.enable() @@ -124,10 +153,9 @@ class TestIngestion(ERP5TypeTestCase): context = self.portal.portal_skins.custom script_id = 'Document_notifyByEmail' if not hasattr(context, script_id): - factory = context.manage_addProduct['PythonScripts'].manage_addPythonScript - factory(id=script_id) - script = getattr(context, script_id) - script.ZPythonScript_edit('email_to, event, doc, **kw', 'return') + + createZODBPythonScript(context, script_id, + 'email_to, event, doc, **kw', 'return') def createDefaultCategoryList(self): """ @@ -208,47 +236,16 @@ class TestIngestion(ERP5TypeTestCase): categories.append(category) return categories - def getDocument(self, id): - """ - Returns a document with given ID in the - document module. - """ - document_module = self.portal.document_module - return getattr(document_module, id) - - def checkIsObjectCatalogged(self, portal_type, **kw): - """ - Make sure that a document with given portal type - and kw properties is already present in the catalog. - - Typical use of this method consists in providing - an id or reference. - """ - res = self.portal_catalog(portal_type=portal_type, **kw.copy()) - self.assertEquals(len(res), 1) - for key, value in kw.items(): - self.assertEquals(res[0].getProperty(key), value) - - def newEmptyCataloggedDocument(self, portal_type, id): + def newEmptyDocument(self, portal_type): """ Create an empty document of given portal type and given ID. - Documents are immediately catalogged and verified - both form catalog point of view and from their - presence in the document module. """ document_module = self.portal.getDefaultModule(portal_type) - document = getattr(document_module, id, None) - if document is not None: - document_module.manage_delObjects([id,]) - document = document_module.newContent(portal_type=portal_type, id=id) - self.stepTic() - self.checkIsObjectCatalogged(portal_type, id=id, parent_uid=document_module.getUid()) - self.assert_(hasattr(document_module, id)) - return document + return document_module.newContent(portal_type=portal_type) - def ingestFormatList(self, document_id, format_list, portal_type=None): + def ingestFormatList(self, document, format_list): """ Upload in document document_id all test files which match any of the formats in format_list. @@ -260,11 +257,6 @@ class TestIngestion(ERP5TypeTestCase): For every file, this checks is the word "magic" is present in both SearchableText and asText. """ - if portal_type is None: - document_module = self.portal.document_module - else: - document_module = self.portal.getDefaultModule(portal_type) - document = getattr(document_module, document_id) for revision, format in enumerate(format_list): filename = 'TEST-en-002.%s' %format f = makeFileUpload(filename) @@ -280,13 +272,12 @@ class TestIngestion(ERP5TypeTestCase): # check if SearchableText() does not raise any exception document.SearchableText() - def checkDocumentExportList(self, document_id, format, asserted_target_list): + def checkDocumentExportList(self, document, format, asserted_target_list): """ Upload document ID document_id with a test file of given format and assert that the document can be converted to any of the formats in asserted_target_list """ - document = self.getDocument(document_id) filename = 'TEST-en-002.' + format f = makeFileUpload(filename) document.edit(file=f) @@ -295,7 +286,8 @@ class TestIngestion(ERP5TypeTestCase): self.getPortal().portal_caches.clearCache() target_list = document.getTargetFormatList() for target in asserted_target_list: - self.assert_(target in target_list) + self.assertTrue(target in target_list, 'target:%r not in %r' % (target, + target_list,)) def contributeFileList(self, with_portal_type=False): """ @@ -344,57 +336,57 @@ class TestIngestion(ERP5TypeTestCase): self.assertEquals(document.getExternalProcessingState(), 'converted') self.assert_('magic' in document.SearchableText()) - def newPythonScript(self, object_id, script_id, argument_list, code): + def newPythonScript(self, script_id, argument_list, code): """ Creates a new python script with given argument_list and source code. """ - context = self.getDocument(object_id) - context.manage_addProduct['PythonScripts'].manage_addPythonScript(id=script_id) - script = getattr(context, script_id) - script.ZPythonScript_edit(argument_list, code) + context = self.portal.portal_skins.custom + if context._getOb(script_id, None) is not None: + context._delObject(script_id) + createZODBPythonScript(context, script_id, argument_list, code) - def setDiscoveryOrder(self, order, id='one'): + def setDiscoveryOrder(self, order): """ Creates a script to define the metadata discovery order for Text documents. """ script_code = "return %s" % str(order) - self.newPythonScript(id, 'Text_getPreferredDocumentMetadataDiscoveryOrderList', '', script_code) - - def discoverMetadata(self, document_id='one'): + self.newPythonScript('Text_getPreferredDocumentMetadataDiscoveryOrderList', + '', script_code) + + def discoverMetadata(self, document): """ Sets input parameters and on the document ID document_id and discover metadata. For reindexing """ - document = self.getDocument(document_id) - # simulate user input - document._backup_input = dict(reference='INPUT', - language='in', - version='004', - short_title='from_input', - contributor='person_module/james') - # pass to discovery file_name and user_login - document.discoverMetadata(document.getSourceReference(), 'john_doe') + input_parameter_dict = dict(reference='INPUT', + language='in', + version='004', + short_title='from_input', + contributor='person_module/james') + # pass to discovery filename and user_login + document.discoverMetadata(filename=document.getFilename(), + user_login='john_doe', + input_parameter_dict=input_parameter_dict) self.stepTic() - - def checkMetadataOrder(self, expected_metadata, document_id='one'): + + def checkMetadataOrder(self, document, expected_metadata): """ Asserts that metadata of document ID document_id is the same as expected_metadata """ - document = self.getDocument(document_id) for k, v in expected_metadata.items(): self.assertEquals(document.getProperty(k), v) def receiveEmail(self, data, portal_type='Document Ingestion Message', container_path='document_ingestion_module', - file_name='email.emx'): + filename='email.emx'): return self.portal.portal_contributions.newContent(data=data, portal_type=portal_type, container_path=container_path, - file_name=file_name) + filename=filename) ################################## ## Basic steps @@ -422,56 +414,63 @@ class TestIngestion(ERP5TypeTestCase): Create an empty Text document with ID 'one' This document will be used in most tests. """ - self.newEmptyCataloggedDocument('Text', 'one') + document = self.newEmptyDocument('Text') + sequence.edit(document_path=document.getPath()) def stepCreateSpreadsheetDocument(self, sequence=None, sequence_list=None, **kw): """ Create an empty Spreadsheet document with ID 'two' This document will be used in most tests. """ - self.newEmptyCataloggedDocument('Spreadsheet', 'two') + document = self.newEmptyDocument('Spreadsheet') + sequence.edit(document_path=document.getPath()) def stepCreatePresentationDocument(self, sequence=None, sequence_list=None, **kw): """ Create an empty Presentation document with ID 'three' This document will be used in most tests. """ - self.newEmptyCataloggedDocument('Presentation', 'three') + document = self.newEmptyDocument('Presentation') + sequence.edit(document_path=document.getPath()) def stepCreateDrawingDocument(self, sequence=None, sequence_list=None, **kw): """ Create an empty Drawing document with ID 'four' This document will be used in most tests. """ - self.newEmptyCataloggedDocument('Drawing', 'four') + document = self.newEmptyDocument('Presentation') + sequence.edit(document_path=document.getPath()) def stepCreatePDFDocument(self, sequence=None, sequence_list=None, **kw): """ Create an empty PDF document with ID 'five' This document will be used in most tests. """ - self.newEmptyCataloggedDocument('PDF', 'five') + document = self.newEmptyDocument('PDF') + sequence.edit(document_path=document.getPath()) def stepCreateImageDocument(self, sequence=None, sequence_list=None, **kw): """ Create an empty Image document with ID 'six' This document will be used in most tests. """ - self.newEmptyCataloggedDocument('Image', 'six') + document = self.newEmptyDocument('Image') + sequence.edit(document_path=document.getPath()) def stepCreateFileDocument(self, sequence=None, sequence_list=None, **kw): """ Create an empty File document with ID 'file' This document will be used in most tests. """ - self.newEmptyCataloggedDocument('File', 'file') + document = self.newEmptyDocument('File') + sequence.edit(document_path=document.getPath()) def stepCheckEmptyState(self, sequence=None, sequence_list=None, **kw): """ Check if the document is in "empty" processing state (ie. no file upload has been done yet) """ - document = self.getDocument('one') + document = self.portal.restrictedTraverse(sequence.get('document_path')) return self.assertEquals(document.getExternalProcessingState(), 'empty') def stepCheckUploadedState(self, sequence=None, sequence_list=None, **kw): @@ -479,7 +478,7 @@ class TestIngestion(ERP5TypeTestCase): Check if the document is in "uploaded" processing state (ie. a file upload has been done) """ - document = self.getDocument('one') + document = self.portal.restrictedTraverse(sequence.get('document_path')) return self.assertEquals(document.getExternalProcessingState(), 'uploaded') def stepCheckConvertingState(self, sequence=None, sequence_list=None, **kw): @@ -487,7 +486,7 @@ class TestIngestion(ERP5TypeTestCase): Check if the document is in "converting" processing state (ie. a file upload has been done and the document is converting) """ - document = self.getDocument('one') + document = self.portal.restrictedTraverse(sequence.get('document_path')) return self.assertEquals(document.getExternalProcessingState(), 'converting') def stepCheckConvertedState(self, sequence=None, sequence_list=None, **kw): @@ -496,23 +495,22 @@ class TestIngestion(ERP5TypeTestCase): (ie. a file conversion has been done and the document has been converted) """ - document = self.getDocument('one') + document = self.portal.restrictedTraverse(sequence.get('document_path')) return self.assertEquals(document.getExternalProcessingState(), 'converted') def stepStraightUpload(self, sequence=None, sequence_list=None, **kw): """ Upload a file directly from the form - check if it has the data and source_reference + check if it has the data and filename """ filename = 'TEST-en-002.doc' - document = self.getDocument('one') + document = self.portal.restrictedTraverse(sequence.get('document_path')) # First revision is 1 (like web pages) self.assertEquals(document.getRevision(), '1') f = makeFileUpload(filename) document.edit(file=f) self.assert_(document.hasFile()) - # source_reference set to file name ? - self.assertEquals(document.getSourceReference(), filename) + self.assertEquals(document.getFilename(), filename) # Revision is 1 after upload (revisions are strings) self.assertEquals(document.getRevision(), '2') document.reindexObject() @@ -522,7 +520,7 @@ class TestIngestion(ERP5TypeTestCase): """ Upload a file from view form and make sure this increases the revision """ - document = self.getDocument('one') + document = self.portal.restrictedTraverse(sequence.get('document_path')) f = makeFileUpload('TEST-en-002.doc') revision = document.getRevision() document.edit(file=f) @@ -535,7 +533,8 @@ class TestIngestion(ERP5TypeTestCase): Upload a file from contribution. """ f = makeFileUpload('TEST-en-002.doc') - self.portal.portal_contributions.newContent(id='one', file=f) + document = self.portal.portal_contributions.newContent(file=f) + sequence.edit(document_path=document.getPath()) transaction.commit() def stepReuploadTextFromContributionTool(self, sequence=None, sequence_list=None, **kw): @@ -543,7 +542,7 @@ class TestIngestion(ERP5TypeTestCase): Upload a file from contribution form and make sure this update existing document and don't make a new document. """ - document = self.getDocument('one') + document = self.portal.restrictedTraverse(sequence.get('document_path')) revision = document.getRevision() number_of_document = len(self.portal.document_module.objectIds()) self.assert_('This document is modified.' not in document.asText()) @@ -565,10 +564,10 @@ class TestIngestion(ERP5TypeTestCase): Upload another file from contribution. """ f = makeFileUpload('ANOTHE-en-001.doc') - self.portal.portal_contributions.newContent(id='two', file=f) + document = self.portal.portal_contributions.newContent(id='two', file=f) + sequence.edit(document_path=document.getPath()) self.stepTic() - document = self.getDocument('two') - self.assert_('This is a another very interesting document.' in document.asText()) + self.assertTrue('This is a another very interesting document.' in document.asText()) self.assertEquals(document.getReference(), 'ANOTHE') self.assertEquals(document.getVersion(), '001') self.assertEquals(document.getLanguage(), 'en') @@ -579,10 +578,10 @@ class TestIngestion(ERP5TypeTestCase): discovery and we should have basic coordinates immediately, from first stage. """ - document = self.getDocument('one') - file_name = 'TEST-en-002.doc' + document = self.portal.restrictedTraverse(sequence.get('document_path')) + filename = 'TEST-en-002.doc' # First make sure the regular expressions work - property_dict = document.getPropertyDictFromFileName(file_name) + property_dict = document.getPropertyDictFromFilename(filename) self.assertEquals(property_dict['reference'], 'TEST') self.assertEquals(property_dict['language'], 'en') self.assertEquals(property_dict['version'], '002') @@ -593,12 +592,12 @@ class TestIngestion(ERP5TypeTestCase): self.assertEquals(property_dict['description'], 'comments') self.assertEquals(property_dict['subject_list'], ['keywords']) # Then make sure metadata discovery works - f = makeFileUpload(file_name) + f = makeFileUpload(filename) document.edit(file=f) self.assertEquals(document.getReference(), 'TEST') self.assertEquals(document.getLanguage(), 'en') self.assertEquals(document.getVersion(), '002') - self.assertEquals(document.getSourceReference(), file_name) + self.assertEquals(document.getFilename(), filename) def stepCheckConvertedContent(self, sequence=None, sequence_list=None, **kw): """ @@ -607,7 +606,7 @@ class TestIngestion(ERP5TypeTestCase): the word "magic" """ self.tic() - document = self.getDocument('one') + document = self.portal.restrictedTraverse(sequence.get('document_path')) self.assert_(document.hasBaseData()) self.assert_('magic' in document.SearchableText()) self.assert_('magic' in str(document.asText())) @@ -617,9 +616,9 @@ class TestIngestion(ERP5TypeTestCase): Create Text_getPropertyDictFrom[source] scripts to simulate custom site's configuration """ - self.newPythonScript('one', 'Text_getPropertyDictFromUserLogin', + self.newPythonScript('Text_getPropertyDictFromUserLogin', 'user_name=None', "return {'contributor':'person_module/john'}") - self.newPythonScript('one', 'Text_getPropertyDictFromContent', '', + self.newPythonScript('Text_getPropertyDictFromContent', '', "return {'short_title':'short', 'title':'title', 'contributor':'person_module/john',}") def stepTestMetadataSetting(self, sequence=None, sequence_list=None, **kw): @@ -627,9 +626,8 @@ class TestIngestion(ERP5TypeTestCase): Upload with custom getPropertyDict methods check that all metadata are correct """ - document = self.getDocument('one') f = makeFileUpload('TEST-en-002.doc') - document.edit(file=f) + document = self.portal.portal_contributions.newContent(file=f) self.stepTic() # Then make sure content discover works property_dict = document.getPropertyDictFromUserLogin() @@ -647,7 +645,7 @@ class TestIngestion(ERP5TypeTestCase): """ we change metadata in a document which has ODF """ - document = self.getDocument('one') + document = self.portal.restrictedTraverse(sequence.get('document_path')) kw = dict(title='another title', subject='another subject', description='another description') @@ -661,7 +659,7 @@ class TestIngestion(ERP5TypeTestCase): # XXX actually this is an example of how it should be # implemented in OOoDocument class - we don't really # need oood for getting/setting metadata... - document = self.getDocument('one') + document = self.portal.restrictedTraverse(sequence.get('document_path')) newcontent = document.getBaseData() builder = OOoBuilder(newcontent) xml_tree = etree.fromstring(builder.extract('meta.xml')) @@ -678,23 +676,28 @@ class TestIngestion(ERP5TypeTestCase): make sure they are converted """ format_list = ['rtf', 'doc', 'txt', 'sxw', 'sdw'] - self.ingestFormatList('one', format_list) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.ingestFormatList(document, format_list) - def stepIngestSpreadsheetFormats(self, sequence=None, sequence_list=None, **kw): + def stepIngestSpreadsheetFormats(self, sequence=None, sequence_list=None, + **kw): """ ingest all supported spreadsheet formats make sure they are converted """ format_list = ['xls', 'sxc', 'sdc'] - self.ingestFormatList('two', format_list) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.ingestFormatList(document, format_list) - def stepIngestPresentationFormats(self, sequence=None, sequence_list=None, **kw): + def stepIngestPresentationFormats(self, sequence=None, sequence_list=None, + **kw): """ ingest all supported presentation formats make sure they are converted """ format_list = ['ppt', 'sxi', 'sdd'] - self.ingestFormatList('three', format_list) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.ingestFormatList(document, format_list) def stepIngestPDFFormats(self, sequence=None, sequence_list=None, **kw): """ @@ -702,7 +705,8 @@ class TestIngestion(ERP5TypeTestCase): make sure they are converted """ format_list = ['pdf'] - self.ingestFormatList('five', format_list) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.ingestFormatList(document, format_list) def stepIngestDrawingFormats(self, sequence=None, sequence_list=None, **kw): """ @@ -710,7 +714,8 @@ class TestIngestion(ERP5TypeTestCase): make sure they are converted """ format_list = ['sxd',] - self.ingestFormatList('four', format_list) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.ingestFormatList(document, format_list) def stepIngestPDFFormats(self, sequence=None, sequence_list=None, **kw): """ @@ -718,39 +723,52 @@ class TestIngestion(ERP5TypeTestCase): make sure they are converted """ format_list = ['pdf'] - self.ingestFormatList('five', format_list) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.ingestFormatList(document, format_list) def stepIngestImageFormats(self, sequence=None, sequence_list=None, **kw): """ ingest all supported image formats """ format_list = ['jpg', 'gif', 'bmp', 'png'] - self.ingestFormatList('six', format_list, 'Image') + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.ingestFormatList(document, format_list) def stepIngestFileFormats(self, sequence=None, sequence_list=None, **kw): """ ingest all supported file formats """ format_list = ['txt', 'rss', 'xml',] - self.ingestFormatList('file', format_list) - - def stepCheckTextDocumentExportList(self, sequence=None, sequence_list=None, **kw): - self.checkDocumentExportList('one', 'doc', ['pdf', 'doc', 'rtf', 'writer.html', 'txt']) - - def stepCheckSpreadsheetDocumentExportList(self, sequence=None, sequence_list=None, **kw): - self.checkDocumentExportList('two', 'xls', ['csv', 'calc.html', 'xls', 'calc.pdf']) - - def stepCheckPresentationDocumentExportList(self, sequence=None, sequence_list=None, **kw): - self.checkDocumentExportList('three', 'ppt', ['impr.pdf', 'ppt']) - - def stepCheckDrawingDocumentExportList(self, sequence=None, sequence_list=None, **kw): - self.checkDocumentExportList('four', 'sxd', ['jpg', 'draw.pdf', 'svg']) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.ingestFormatList(document, format_list) + + def stepCheckTextDocumentExportList(self, sequence=None, sequence_list=None, + **kw): + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.checkDocumentExportList(document, 'doc', + ['pdf', 'doc', 'rtf', 'writer.html', 'txt']) + + def stepCheckSpreadsheetDocumentExportList(self, sequence=None, + sequence_list=None, **kw): + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.checkDocumentExportList(document, 'xls', + ['csv', 'calc.html', 'xls', 'calc.pdf']) + + def stepCheckPresentationDocumentExportList(self, sequence=None, + sequence_list=None, **kw): + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.checkDocumentExportList(document, 'ppt', ['impr.pdf', 'ppt']) + + def stepCheckDrawingDocumentExportList(self, sequence=None, + sequence_list=None, **kw): + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.checkDocumentExportList(document, 'sxd', ['jpg', 'draw.pdf', 'svg']) def stepExportPDF(self, sequence=None, sequence_list=None, **kw): """ Try to export PDF to text and HTML """ - document = self.getDocument('five') + document = self.portal.restrictedTraverse(sequence.get('document_path')) f = makeFileUpload('TEST-en-002.pdf') document.edit(file=f) mime, text = document.convert('text') @@ -764,7 +782,7 @@ class TestIngestion(ERP5TypeTestCase): """ Check we are able to resize images """ - image = self.portal.image_module.six + image = self.portal.restrictedTraverse(sequence.get('document_path')) f = makeFileUpload('TEST-en-002.jpg') image.edit(file=f) self.stepTic() @@ -781,7 +799,7 @@ class TestIngestion(ERP5TypeTestCase): """ portal = self.getPortal() for module in (portal.document_module, portal.image_module, portal.document_ingestion_module): - module.manage_delObjects(map(None, module.objectIds())) + module.manage_delObjects(list(module.objectIds())) def stepContributeFileListWithType(self, sequence=None, sequence_list=None, **kw): """ @@ -790,14 +808,16 @@ class TestIngestion(ERP5TypeTestCase): """ self.contributeFileList(with_portal_type=True) - def stepContributeFileListWithNoType(self, sequence=None, sequence_list=None, **kw): + def stepContributeFileListWithNoType(self, sequence=None, sequence_list=None, + **kw): """ Contribute all kinds of files let the system figure out portal type by itself """ self.contributeFileList(with_portal_type=False) - def stepSetSimulatedDiscoveryScriptForOrdering(self, sequence=None, sequence_list=None, **kw): + def stepSetSimulatedDiscoveryScriptForOrdering(self, sequence=None, + sequence_list=None, **kw): """ set scripts which are supposed to overwrite each other's metadata desing is the following: @@ -808,53 +828,85 @@ class TestIngestion(ERP5TypeTestCase): contributor john jack james short_title from_content from_input """ - self.newPythonScript('one', 'Text_getPropertyDictFromUserLogin', 'user_name=None', "return {'reference':'USER', 'language':'us', 'contributor':'person_module/john'}") - self.newPythonScript('one', 'Text_getPropertyDictFromContent', '', "return {'reference':'CONT', 'version':'003', 'contributor':'person_module/jack', 'short_title':'from_content'}") - - def stepCheckMetadataSettingOrderFICU(self, sequence=None, sequence_list=None, **kw): + input_dict = dict(reference='INPUT', + language='in', + version='004', + short_title='from_input', + contributor='person_module/james') + self.newPythonScript('Text_getPropertyDictFromInput', + 'inputed_kw', "return %r" % (input_dict,)) + self.newPythonScript('Text_getPropertyDictFromUserLogin', 'user_name=None', + "return {'reference':'USER', 'language':'us',"\ + " 'contributor':'person_module/john'}") + self.newPythonScript('Text_getPropertyDictFromContent', '', + "return {'reference':'CONT', 'version':'003',"\ + " 'contributor':'person_module/jack',"\ + " 'short_title':'from_content'}") + + def stepCheckMetadataSettingOrderFICU(self, sequence=None, + sequence_list=None, **kw): """ This is the default """ - expected_metadata = dict(reference='TEST', language='en', version='002', short_title='from_input', contributor='person_module/james') - self.setDiscoveryOrder(['file_name', 'input', 'content', 'user_login']) - self.discoverMetadata() - self.checkMetadataOrder(expected_metadata) + expected_metadata = dict(reference='TEST', language='en', version='002', + short_title='from_input', + contributor='person_module/james') + self.setDiscoveryOrder(['filename', 'input', 'content', 'user_login']) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.discoverMetadata(document) + self.checkMetadataOrder(document, expected_metadata) - def stepCheckMetadataSettingOrderCUFI(self, sequence=None, sequence_list=None, **kw): + def stepCheckMetadataSettingOrderCUFI(self, sequence=None, + sequence_list=None, **kw): """ Content - User - Filename - Input """ - expected_metadata = dict(reference='CONT', language='us', version='003', short_title='from_content', contributor='person_module/jack') - self.setDiscoveryOrder(['content', 'user_login', 'file_name', 'input']) - self.discoverMetadata() - self.checkMetadataOrder(expected_metadata) + expected_metadata = dict(reference='CONT', language='us', version='003', + short_title='from_content', + contributor='person_module/jack') + self.setDiscoveryOrder(['content', 'user_login', 'filename', 'input']) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.discoverMetadata(document) + self.checkMetadataOrder(document, expected_metadata) - def stepCheckMetadataSettingOrderUIFC(self, sequence=None, sequence_list=None, **kw): + def stepCheckMetadataSettingOrderUIFC(self, sequence=None, + sequence_list=None, **kw): """ User - Input - Filename - Content """ - expected_metadata = dict(reference='USER', language='us', version='004', short_title='from_input', contributor='person_module/john') - self.setDiscoveryOrder(['user_login', 'input', 'file_name', 'content']) - self.discoverMetadata() - self.checkMetadataOrder(expected_metadata) + expected_metadata = dict(reference='USER', language='us', version='004', + short_title='from_input', + contributor='person_module/john') + self.setDiscoveryOrder(['user_login', 'input', 'filename', 'content']) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.discoverMetadata(document) + self.checkMetadataOrder(document, expected_metadata) - def stepCheckMetadataSettingOrderICUF(self, sequence=None, sequence_list=None, **kw): + def stepCheckMetadataSettingOrderICUF(self, sequence=None, + sequence_list=None, **kw): """ Input - Content - User - Filename """ - expected_metadata = dict(reference='INPUT', language='in', version='004', short_title='from_input', contributor='person_module/james') - self.setDiscoveryOrder(['input', 'content', 'user_login', 'file_name']) - self.discoverMetadata() - self.checkMetadataOrder(expected_metadata) + expected_metadata = dict(reference='INPUT', language='in', version='004', + short_title='from_input', + contributor='person_module/james') + self.setDiscoveryOrder(['input', 'content', 'user_login', 'filename']) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.discoverMetadata(document) + self.checkMetadataOrder(document, expected_metadata) - def stepCheckMetadataSettingOrderUFCI(self, sequence=None, sequence_list=None, **kw): + def stepCheckMetadataSettingOrderUFCI(self, sequence=None, + sequence_list=None, **kw): """ User - Filename - Content - Input """ - expected_metadata = dict(reference='USER', language='us', version='002', short_title='from_content', contributor='person_module/john') - self.setDiscoveryOrder(['user_login', 'file_name', 'content', 'input']) - self.discoverMetadata() - self.checkMetadataOrder(expected_metadata) + expected_metadata = dict(reference='USER', language='us', version='002', + short_title='from_content', + contributor='person_module/john') + self.setDiscoveryOrder(['user_login', 'filename', 'content', 'input']) + document = self.portal.restrictedTraverse(sequence.get('document_path')) + self.discoverMetadata(document) + self.checkMetadataOrder(document, expected_metadata) def stepReceiveEmail(self, sequence=None, sequence_list=None, **kw): """ @@ -864,7 +916,8 @@ class TestIngestion(ERP5TypeTestCase): document = self.receiveEmail(f.read()) self.stepTic() - def stepReceiveMultipleAttachmentsEmail(self, sequence=None, sequence_list=None, **kw): + def stepReceiveMultipleAttachmentsEmail(self, sequence=None, + sequence_list=None, **kw): """ Email was sent in by someone to ERP5. """ @@ -958,7 +1011,7 @@ class TestIngestion(ERP5TypeTestCase): reference='MAIL', language='en', version='002') - self.assertEquals('MAIL-en-002.doc', ingested_document.getSourceReference()) + self.assertEquals('MAIL-en-002.doc', ingested_document.getFilename()) self.assertEquals('converted', ingested_document.getExternalProcessingState()) self.assertTrue('magic' in ingested_document.asText()) @@ -978,7 +1031,7 @@ class TestIngestion(ERP5TypeTestCase): conversion_dict = _getConversionServerDict() self.assertEquals(preference_tool.getPreferredOoodocServerAddress(), conversion_dict['hostname']) self.assertEquals(preference_tool.getPreferredOoodocServerPortNumber(), conversion_dict['port']) - self.assertEquals(preference_tool.getPreferredDocumentFileNameRegularExpression(), FILE_NAME_REGULAR_EXPRESSION) + self.assertEquals(preference_tool.getPreferredDocumentFilenameRegularExpression(), FILENAME_REGULAR_EXPRESSION) self.assertEquals(preference_tool.getPreferredDocumentReferenceRegularExpression(), REFERENCE_REGULAR_EXPRESSION) def test_02_FileExtensionRegistry(self): @@ -1008,8 +1061,8 @@ class TestIngestion(ERP5TypeTestCase): 'xxx' : 'File', } for type, portal_type in correct_type_mapping.items(): - file_name = 'aaa.' + type - self.assertEquals(reg.findPortalTypeName(file_name, None, None), + filename = 'aaa.' + type + self.assertEquals(reg.findPortalTypeName(filename=filename), portal_type) def test_03_TextDoc(self): @@ -1300,7 +1353,7 @@ class TestIngestion(ERP5TypeTestCase): """ f = makeFileUpload('TEST-en-002.doc', 'T&é@{T-en-002.doc') document = self.portal.portal_contributions.newContent(file=f) - sequence.edit(document_id=document.getId()) + sequence.edit(document_path=document.getPath()) transaction.commit() def stepDiscoverFromFilenameWithNonASCIIFilename(self, @@ -1310,10 +1363,10 @@ class TestIngestion(ERP5TypeTestCase): discovery and we should have basic coordinates immediately, from first stage. """ - context = self.getDocument(sequence.get('document_id')) - file_name = 'T&é@{T-en-002.doc' + context = self.portal.restrictedTraverse(sequence.get('document_path')) + filename = 'T&é@{T-en-002.doc' # First make sure the regular expressions work - property_dict = context.getPropertyDictFromFileName(file_name) + property_dict = context.getPropertyDictFromFilename(filename) self.assertEquals(property_dict['reference'], 'T&é@{T') self.assertEquals(property_dict['language'], 'en') self.assertEquals(property_dict['version'], '002') @@ -1327,7 +1380,7 @@ class TestIngestion(ERP5TypeTestCase): self.assertEquals(context.getReference(), 'T&é@{T') self.assertEquals(context.getLanguage(), 'en') self.assertEquals(context.getVersion(), '002') - self.assertEquals(context.getSourceReference(), file_name) + self.assertEquals(context.getFilename(), filename) def test_13_UploadTextFromContributionToolWithNonASCIIFilename(self): """ @@ -1363,8 +1416,8 @@ class TestIngestion(ERP5TypeTestCase): self.assertEquals(1, len(portal.portal_catalog(path=contribution_tool.getPath()))) - def test_15_TestFileNameDiscovery(self): - """Test that filename is well set in source_reference + def test_15_TestFilenameDiscovery(self): + """Test that filename is well set in filename - filename can we discovery from file - filename can be pass as argument by the user """ @@ -1372,12 +1425,12 @@ class TestIngestion(ERP5TypeTestCase): contribution_tool = getToolByName(portal, 'portal_contributions') file_object = makeFileUpload('TEST-en-002.doc') document = contribution_tool.newContent(file=file_object) - self.assertEquals(document.getSourceReference(), 'TEST-en-002.doc') + self.assertEquals(document.getFilename(), 'TEST-en-002.doc') my_filename = 'Something.doc' document = contribution_tool.newContent(file=file_object, - file_name=my_filename) + filename=my_filename) self.stepTic() - self.assertEquals(document.getSourceReference(), my_filename) + self.assertEquals(document.getFilename(), my_filename) def test_16_TestMetadataDiscoveryFromUserLogin(self): """ @@ -1395,16 +1448,502 @@ class TestIngestion(ERP5TypeTestCase): self.stepTic() file_object = makeFileUpload('TEST-en-002.doc') document = contribution_tool.newContent(file=file_object) - document.discoverMetadata(document.getSourceReference(), 'contributor1') + document.discoverMetadata(document.getFilename(), 'contributor1') self.stepTic() - self.assertEquals(document.getSourceReference(), 'TEST-en-002.doc') + self.assertEquals(document.getFilename(), 'TEST-en-002.doc') self.assertEquals('anybody', document.getGroup()) self.assertEquals('site/arctic/spitsbergen', document.getSite()) -# Missing tests + def test_IngestionConfigurationByTypeBasedMethod_usecase1(self): + """How to configure meta data discovery so that each time a file + with same URL is uploaded, a new document is created with same reference + but increased version ? + """ + input_script_id = 'Document_getPropertyDictFromContent' + python_code = """from Products.CMFCore.utils import getToolByName +portal = context.getPortalObject() +information = context.getContentInformation() + +result = {} +property_id_list = context.propertyIds() +for k, v in information.items(): + key = k.lower() + if v: + if isinstance(v, unicode): + v = v.encode('utf-8') + if key in property_id_list: + if key == 'reference': + pass # XXX - We can not trust reference on getContentInformation + else: + result[key] = v + elif key == 'author': + p = context.portal_catalog.getResultValue(title=v, portal_type='Person') + if p is not None: + result['contributor'] = p.getRelativeUrl() + elif key == 'keywords': + result['subject_list'] = v.split() + +reference = context.asNormalisedURL() + +result['reference'] = reference +id_group = ('dms_version_generator', reference) +result['version'] = '%.5d' % (portal.portal_ids.generateNewId(id_group=id_group, default=1)) +return result +""" + self.newPythonScript(input_script_id, '', python_code) + document_to_ingest = self.portal.portal_contributions.newContent( + portal_type='File', + filename='toto.txt', + data='Hello World!') + document_to_ingest.publish() + transaction.commit() + self.tic() + url = document_to_ingest.absolute_url() + '/getData' + first_doc = self.portal.portal_contributions.newContent(url=url) + transaction.commit() + self.tic() + self.assertEquals(first_doc.getPortalType(), 'Text') + self.assertEquals(first_doc.getContentType(), 'text/plain') + self.assertEquals(first_doc.getReference(), first_doc.asNormalisedURL()) + self.assertEquals(first_doc.getVersion(), '00001') + self.assertEquals(first_doc.asURL(), url) + second_doc = self.portal.portal_contributions.newContent(url=url) + transaction.commit() + self.tic() + self.assertEquals(second_doc.getPortalType(), 'Text') + self.assertEquals(second_doc.getContentType(), 'text/plain') + self.assertEquals(second_doc.getReference(), second_doc.asNormalisedURL()) + self.assertEquals(second_doc.getVersion(), '00002') + self.assertEquals(second_doc.asURL(), url) + + document_to_ingest2 = self.portal.portal_contributions.newContent( + portal_type='File', + filename='toto.txt', + data='Hello World!') + document_to_ingest2.publish() + transaction.commit() + self.tic() + url2 = document_to_ingest2.absolute_url() + '/getData' + first_doc = self.portal.portal_contributions.newContent(url=url2) + transaction.commit() + self.tic() + self.assertEquals(first_doc.getPortalType(), 'Text') + self.assertEquals(first_doc.getContentType(), 'text/plain') + self.assertEquals(first_doc.getReference(), first_doc.asNormalisedURL()) + self.assertEquals(first_doc.getVersion(), '00001') + self.assertEquals(first_doc.asURL(), url2) + second_doc = self.portal.portal_contributions.newContent(url=url2) + transaction.commit() + self.tic() + self.assertEquals(second_doc.getPortalType(), 'Text') + self.assertEquals(second_doc.getContentType(), 'text/plain') + self.assertEquals(second_doc.getReference(), second_doc.asNormalisedURL()) + self.assertEquals(second_doc.getVersion(), '00002') + self.assertEquals(second_doc.asURL(), url2) + + def test_IngestionConfigurationByTypeBasedMethod_usecase2(self): + """How to configure meta data discovery so that each time a file + with same URL is uploaded, a new document is created + with same reference but same version ? + """ + input_script_id = 'Document_getPropertyDictFromContent' + python_code = """from Products.CMFCore.utils import getToolByName +portal = context.getPortalObject() +information = context.getContentInformation() + +result = {} +property_id_list = context.propertyIds() +for k, v in information.items(): + key = k.lower() + if v: + if isinstance(v, unicode): + v = v.encode('utf-8') + if key in property_id_list: + if key == 'reference': + pass # XXX - We can not trust reference on getContentInformation + else: + result[key] = v + elif key == 'author': + p = context.portal_catalog.getResultValue(title=v, portal_type='Person') + if p is not None: + result['contributor'] = p.getRelativeUrl() + elif key == 'keywords': + result['subject_list'] = v.split() + +reference = context.asNormalisedURL() +result['reference'] = reference +return result +""" + self.newPythonScript(input_script_id, '', python_code) + document_to_ingest = self.portal.portal_contributions.newContent( + portal_type='File', + filename='toto.txt', + data='Hello World!') + document_to_ingest.publish() + transaction.commit() + self.tic() + url = document_to_ingest.absolute_url() + '/getData' + first_doc = self.portal.portal_contributions.newContent(url=url) + transaction.commit() + self.tic() + self.assertEquals(first_doc.getPortalType(), 'Text') + self.assertEquals(first_doc.getContentType(), 'text/plain') + self.assertEquals(first_doc.getReference(), first_doc.asNormalisedURL()) + self.assertEquals(first_doc.getVersion(), '001') + self.assertEquals(first_doc.asURL(), url) + second_doc = self.portal.portal_contributions.newContent(url=url) + transaction.commit() + self.tic() + self.assertEquals(second_doc.getPortalType(), 'Text') + self.assertEquals(second_doc.getContentType(), 'text/plain') + self.assertEquals(second_doc.getReference(), second_doc.asNormalisedURL()) + self.assertEquals(second_doc.getVersion(), '001') + self.assertEquals(second_doc.asURL(), url) + + document_to_ingest2 = self.portal.portal_contributions.newContent( + portal_type='File', + filename='toto.txt', + data='Hello World!') + document_to_ingest2.publish() + transaction.commit() + self.tic() + url2 = document_to_ingest2.absolute_url() + '/getData' + first_doc = self.portal.portal_contributions.newContent(url=url2) + transaction.commit() + self.tic() + self.assertEquals(first_doc.getPortalType(), 'Text') + self.assertEquals(first_doc.getContentType(), 'text/plain') + self.assertEquals(first_doc.getReference(), first_doc.asNormalisedURL()) + self.assertEquals(first_doc.getVersion(), '001') + self.assertEquals(first_doc.asURL(), url2) + second_doc = self.portal.portal_contributions.newContent(url=url2) + transaction.commit() + self.tic() + self.assertEquals(second_doc.getPortalType(), 'Text') + self.assertEquals(second_doc.getContentType(), 'text/plain') + self.assertEquals(second_doc.getReference(), second_doc.asNormalisedURL()) + self.assertEquals(second_doc.getVersion(), '001') + self.assertEquals(second_doc.asURL(), url2) + + def test_IngestionConfigurationByTypeBasedMethod_usecase3(self): + """How to discover metadata so that each new document + has a new reference which is generated automatically + as an increase sequence of numbers ? + """ + input_script_id = 'Document_finishIngestion' + python_code = """from Products.CMFCore.utils import getToolByName +portal = context.getPortalObject() +portal_ids = getToolByName(portal, 'portal_ids') +id_group = 'dms_reference_generator3' +reference = 'I CHOOSED THIS REFERENCE %s' % portal.portal_ids.generateNewId(id_group=id_group) +context.setReference(reference) +""" + self.newPythonScript(input_script_id, '', python_code) + document_to_ingest = self.portal.portal_contributions.newContent( + portal_type='File', + filename='toto.txt', + data='Hello World!') + document_to_ingest.publish() + transaction.commit() + self.tic() + url = document_to_ingest.absolute_url() + '/getData' + first_doc = self.portal.portal_contributions.newContent(url=url) + transaction.commit() + self.tic() + self.assertEquals(first_doc.getPortalType(), 'Text') + self.assertEquals(first_doc.getContentType(), 'text/plain') + self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 1') + self.assertEquals(first_doc.getVersion(), '001') + self.assertEquals(first_doc.asURL(), url) + second_doc = self.portal.portal_contributions.newContent(url=url) + transaction.commit() + self.tic() + self.assertEquals(second_doc.getPortalType(), 'Text') + self.assertEquals(second_doc.getContentType(), 'text/plain') + self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 2') + self.assertEquals(second_doc.getVersion(), '001') + self.assertEquals(second_doc.asURL(), url) + + document_to_ingest2 = self.portal.portal_contributions.newContent( + portal_type='File', + filename='toto.txt', + data='Hello World!') + document_to_ingest2.publish() + transaction.commit() + self.tic() + self.assertEquals(document_to_ingest2.getReference(), + 'I CHOOSED THIS REFERENCE 3') + + url2 = document_to_ingest2.absolute_url() + '/getData' + first_doc = self.portal.portal_contributions.newContent(url=url2) + transaction.commit() + self.tic() + self.assertEquals(first_doc.getPortalType(), 'Text') + self.assertEquals(first_doc.getContentType(), 'text/plain') + self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 4') + self.assertEquals(first_doc.getVersion(), '001') + self.assertEquals(first_doc.asURL(), url2) + second_doc = self.portal.portal_contributions.newContent(url=url2) + transaction.commit() + self.tic() + self.assertEquals(second_doc.getPortalType(), 'Text') + self.assertEquals(second_doc.getContentType(), 'text/plain') + self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 5') + self.assertEquals(second_doc.getVersion(), '001') + self.assertEquals(second_doc.asURL(), url2) + + def test_IngestionConfigurationByTypeBasedMethod_usecase4(self): + """How to configure meta data discovery so that each time a file + with same URL is uploaded, a new document is created + with same reference (generated automatically as an + increase sequence of numbers) but increased version ? + """ + input_script_id = 'Document_getPropertyDictFromContent' + python_code = """from Products.CMFCore.utils import getToolByName +portal = context.getPortalObject() +information = context.getContentInformation() + +result = {} +property_id_list = context.propertyIds() +for k, v in information.items(): + key = k.lower() + if v: + if isinstance(v, unicode): + v = v.encode('utf-8') + if key in property_id_list: + if key == 'reference': + pass # XXX - We can not trust reference on getContentInformation + else: + result[key] = v + elif key == 'author': + p = context.portal_catalog.getResultValue(title=v, portal_type='Person') + if p is not None: + result['contributor'] = p.getRelativeUrl() + elif key == 'keywords': + result['subject_list'] = v.split() + +url = context.asNormalisedURL() +portal_url_registry = getToolByName(context.getPortalObject(), + 'portal_url_registry') +try: + reference = portal_url_registry.getReferenceFromURL(url) +except KeyError: + id_group = 'dms_reference_generator4' + reference = 'I CHOOSED THIS REFERENCE %s' % portal.portal_ids.generateNewId(id_group=id_group) +result['reference'] = reference +id_group = ('dms_version_generator', reference) +result['version'] = '%.5d' % (portal.portal_ids.generateNewId(id_group=id_group, default=1)) +return result """ - property_dict = context.getPropertyDictFromInput() + self.newPythonScript(input_script_id, '', python_code) + document_to_ingest = self.portal.portal_contributions.newContent( + portal_type='File', + filename='toto.txt', + data='Hello World!') + document_to_ingest.publish() + transaction.commit() + self.tic() + url = document_to_ingest.absolute_url() + '/getData' + first_doc = self.portal.portal_contributions.newContent(url=url) + transaction.commit() + self.tic() + self.assertEquals(first_doc.getPortalType(), 'Text') + self.assertEquals(first_doc.getContentType(), 'text/plain') + self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 1') + self.assertEquals(first_doc.getVersion(), '00001') + self.assertEquals(first_doc.asURL(), url) + second_doc = self.portal.portal_contributions.newContent(url=url) + transaction.commit() + self.tic() + self.assertEquals(second_doc.getPortalType(), 'Text') + self.assertEquals(second_doc.getContentType(), 'text/plain') + self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 1') + self.assertEquals(second_doc.getVersion(), '00002') + self.assertEquals(second_doc.asURL(), url) + + document_to_ingest2 = self.portal.portal_contributions.newContent( + portal_type='File', + filename='toto.txt', + data='Hello World!') + document_to_ingest2.publish() + transaction.commit() + self.tic() + self.assertEquals(document_to_ingest2.getReference(), + 'I CHOOSED THIS REFERENCE 2') + + url2 = document_to_ingest2.absolute_url() + '/getData' + first_doc = self.portal.portal_contributions.newContent(url=url2) + transaction.commit() + self.tic() + self.assertEquals(first_doc.getPortalType(), 'Text') + self.assertEquals(first_doc.getContentType(), 'text/plain') + self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 3') + self.assertEquals(first_doc.getVersion(), '00001') + self.assertEquals(first_doc.asURL(), url2) + second_doc = self.portal.portal_contributions.newContent(url=url2) + transaction.commit() + self.tic() + self.assertEquals(second_doc.getPortalType(), 'Text') + self.assertEquals(second_doc.getContentType(), 'text/plain') + self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 3') + self.assertEquals(second_doc.getVersion(), '00002') + self.assertEquals(second_doc.asURL(), url2) + + def test_IngestionConfigurationByTypeBasedMethod_usecase5(self): + """How to configure meta data discovery so that each time a file + with same URL is uploaded, a new document is created + with same reference (generated automatically as + an increase sequence of numbers) but same version? + """ + input_script_id = 'Document_getPropertyDictFromContent' + python_code = """from Products.CMFCore.utils import getToolByName +portal = context.getPortalObject() +information = context.getContentInformation() + +result = {} +property_id_list = context.propertyIds() +for k, v in information.items(): + key = k.lower() + if v: + if isinstance(v, unicode): + v = v.encode('utf-8') + if key in property_id_list: + if key == 'reference': + pass # XXX - We can not trust reference on getContentInformation + else: + result[key] = v + elif key == 'author': + p = context.portal_catalog.getResultValue(title=v, portal_type='Person') + if p is not None: + result['contributor'] = p.getRelativeUrl() + elif key == 'keywords': + result['subject_list'] = v.split() + +url = context.asNormalisedURL() +portal_url_registry = getToolByName(context.getPortalObject(), + 'portal_url_registry') +try: + reference = portal_url_registry.getReferenceFromURL(url) +except KeyError: + id_group = 'dms_reference_generator5' + reference = 'I CHOOSED THIS REFERENCE %s' % portal.portal_ids.generateNewId(id_group=id_group) +result['reference'] = reference +return result """ + self.newPythonScript(input_script_id, '', python_code) + document_to_ingest = self.portal.portal_contributions.newContent( + portal_type='File', + filename='toto.txt', + data='Hello World!') + document_to_ingest.publish() + transaction.commit() + self.tic() + + url = document_to_ingest.absolute_url() + '/getData' + first_doc = self.portal.portal_contributions.newContent(url=url) + transaction.commit() + self.tic() + self.assertEquals(first_doc.getPortalType(), 'Text') + self.assertEquals(first_doc.getContentType(), 'text/plain') + self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 1') + self.assertEquals(first_doc.getVersion(), '001') + self.assertEquals(first_doc.asURL(), url) + second_doc = self.portal.portal_contributions.newContent(url=url) + transaction.commit() + self.tic() + self.assertEquals(second_doc.getPortalType(), 'Text') + self.assertEquals(second_doc.getContentType(), 'text/plain') + self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 1') + self.assertEquals(second_doc.getVersion(), '001') + self.assertEquals(second_doc.asURL(), url) + + document_to_ingest2 = self.portal.portal_contributions.newContent( + portal_type='File', + filename='toto.txt', + data='Hello World!') + document_to_ingest2.publish() + transaction.commit() + self.tic() + self.assertEquals(document_to_ingest2.getReference(), + 'I CHOOSED THIS REFERENCE 2') + + url2 = document_to_ingest2.absolute_url() + '/getData' + first_doc = self.portal.portal_contributions.newContent(url=url2) + transaction.commit() + self.tic() + self.assertEquals(first_doc.getPortalType(), 'Text') + self.assertEquals(first_doc.getContentType(), 'text/plain') + self.assertEquals(first_doc.getReference(), 'I CHOOSED THIS REFERENCE 3') + self.assertEquals(first_doc.getVersion(), '001') + self.assertEquals(first_doc.asURL(), url2) + second_doc = self.portal.portal_contributions.newContent(url=url2) + transaction.commit() + self.tic() + self.assertEquals(second_doc.getPortalType(), 'Text') + self.assertEquals(second_doc.getContentType(), 'text/plain') + self.assertEquals(second_doc.getReference(), 'I CHOOSED THIS REFERENCE 3') + self.assertEquals(second_doc.getVersion(), '001') + self.assertEquals(second_doc.asURL(), url2) + + def test_IngestionConfigurationByTypeBasedMethod_usecase6(self): + """How to configure meta data discovery so that a Spreadsheet + as a application/octet-stream without explicit extension, become + a Spreadsheet ? + """ + path = makeFilePath('import_region_category.xls') + data = open(path, 'r').read() + + document = self.portal.portal_contributions.newContent(filename='toto', + data=data, + reference='Custom.Reference') + transaction.commit() + self.tic()# Discover metadata will delete first ingested document + # then reingest new one with appropriate portal_type + result_list = self.portal.portal_catalog(reference='Custom.Reference') + self.assertEquals(len(result_list), 1) + self.assertEquals(result_list[0].getPortalType(), 'Spreadsheet') + + def test_IngestionConfigurationByTypeBasedMethod_usecase7(self): + """How to reingest a published document, by a user action ? + If after a while the user decide to change the portal_type of a + published document , File => Text ? + """ + module = self.portal.document_module + document = module.newContent(portal_type='File', + property_which_doesnot_exists='Foo', + data='Hello World!', + filename='toto.txt') + document.publish() + transaction.commit() + self.tic() + document.edit(title='One title', reference='EFAA') + transaction.commit() + self.tic() + # Now change it to a Text portal_type + new_doc = document.migratePortalType('Text') + transaction.commit() + self.tic() + self.assertEquals(new_doc.getPortalType(), 'Text') + self.assertEquals(new_doc.getProperty('property_which_doesnot_exists'), + 'Foo') + self.assertEquals(new_doc.getTitle(), 'One title') + self.assertEquals(new_doc.getReference(), 'EFAA') + self.assertEquals(new_doc.getValidationState(), 'published') + self.assertEquals(new_doc.getData(), 'Hello World!') + + # Migrate a document with url property + url = new_doc.absolute_url() + '/getData' + document = self.portal.portal_contributions.newContent(url=url) + document.submit() + transaction.commit() + self.tic() + self.assertEquals(document.getPortalType(), 'Text') + # Change it to File + new_doc = document.migratePortalType('File') + self.assertEquals(new_doc.getPortalType(), 'File') + self.assertEquals(new_doc.asURL(), url) + self.assertEquals(new_doc.getData(), 'Hello World!') + self.assertEquals(new_doc.getValidationState(), 'submitted') def test_suite(): suite = unittest.TestSuite() diff --git a/product/ERP5OOo/tests/testOOoConversionCache.py b/product/ERP5OOo/tests/testOOoConversionCache.py index 001e71989dd..3b2a5c4172d 100644 --- a/product/ERP5OOo/tests/testOOoConversionCache.py +++ b/product/ERP5OOo/tests/testOOoConversionCache.py @@ -42,11 +42,6 @@ from zLOG import LOG import os -TEST_FILES_HOME = os.path.join(os.path.dirname(__file__), 'test_document') -FILE_NAME_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})-(?P<language>[a-z]{2})-(?P<version>[0-9]{3})" -REFERENCE_REGULAR_EXPRESSION = "(?P<reference>[A-Z]{3,10})(-(?P<language>[a-z]{2}))?(-(?P<version>[0-9]{3}))?" - - def makeFilePath(name): return os.path.join(os.path.dirname(__file__), 'test_document', name) @@ -291,7 +286,10 @@ class TestDocumentConversionCache(TestDocumentMixin): filename = 'TEST-en-002.doc' file = makeFileUpload(filename) document_id = 'an id with spaces' - document = self.portal.portal_contributions.newContent(id=document_id, file=file) + portal_type = 'Text' + module = self.portal.getDefaultModule(portal_type) + document = module.newContent(id=document_id, file=file, + portal_type=portal_type) transaction.commit() self.tic() document_url = document.getRelativeUrl() -- 2.30.9