Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
erp5
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Labels
Merge Requests
138
Merge Requests
138
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Jobs
Commits
Open sidebar
nexedi
erp5
Commits
9e2642a7
Commit
9e2642a7
authored
Jan 15, 2024
by
Jérome Perrin
Browse files
Options
Browse Files
Download
Plain Diff
DMS: handle gracefully unauthorized ingestion scenarios
See merge request
nexedi/erp5!1846
parents
0cfb1586
5c601193
Pipeline
#32188
failed with stage
Changes
6
Pipelines
2
Hide whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
328 additions
and
78 deletions
+328
-78
bt5/erp5_dms/DocumentTemplateItem/portal_components/document.erp5.PDFDocument.py
...mplateItem/portal_components/document.erp5.PDFDocument.py
+8
-1
bt5/erp5_dms/TestTemplateItem/portal_components/test.erp5.testDms.py
...s/TestTemplateItem/portal_components/test.erp5.testDms.py
+88
-0
bt5/erp5_ingestion/SkinTemplateItem/portal_skins/erp5_ingestion/Base_contribute.py
...mplateItem/portal_skins/erp5_ingestion/Base_contribute.py
+52
-33
product/ERP5/bootstrap/erp5_core/MixinTemplateItem/portal_components/mixin.erp5.DiscoverableMixin.py
...ateItem/portal_components/mixin.erp5.DiscoverableMixin.py
+28
-17
product/ERP5/bootstrap/erp5_core/ToolComponentTemplateItem/portal_components/tool.erp5.ContributionTool.py
...plateItem/portal_components/tool.erp5.ContributionTool.py
+20
-19
product/ERP5OOo/tests/testIngestion.py
product/ERP5OOo/tests/testIngestion.py
+132
-8
No files found.
bt5/erp5_dms/DocumentTemplateItem/portal_components/document.erp5.PDFDocument.py
View file @
9e2642a7
...
@@ -395,4 +395,11 @@ class PDFDocument(Image):
...
@@ -395,4 +395,11 @@ class PDFDocument(Image):
del
self
.
_content_information
del
self
.
_content_information
except
(
AttributeError
,
KeyError
):
except
(
AttributeError
,
KeyError
):
pass
pass
Image
.
_setFile
(
self
,
*
args
,
**
kw
)
super
(
PDFDocument
,
self
).
_setFile
(
*
args
,
**
kw
)
def
_setData
(
self
,
*
args
,
**
kw
):
try
:
del
self
.
_content_information
except
(
AttributeError
,
KeyError
):
pass
super
(
PDFDocument
,
self
).
_setData
(
*
args
,
**
kw
)
bt5/erp5_dms/TestTemplateItem/portal_components/test.erp5.testDms.py
View file @
9e2642a7
...
@@ -3055,6 +3055,94 @@ class TestDocumentWithSecurity(TestDocumentMixin):
...
@@ -3055,6 +3055,94 @@ class TestDocumentWithSecurity(TestDocumentMixin):
user_pref
.
getPreferredThumbnailImageHeight
()),
user_pref
.
getPreferredThumbnailImageHeight
()),
image
.
getSizeFromImageDisplay
(
'thumbnail'
))
image
.
getSizeFromImageDisplay
(
'thumbnail'
))
def
test_mergeRevision
(
self
):
document1
=
self
.
portal
.
portal_contributions
.
newContent
(
file
=
makeFileUpload
(
'TEST-en-002.doc'
))
self
.
tic
()
self
.
assertEqual
(
(
document1
.
getReference
(),
document1
.
getLanguage
(),
document1
.
getVersion
()),
(
'TEST'
,
'en'
,
'002'
))
self
.
assertNotIn
(
'This document is modified'
,
document1
.
asText
())
document2
=
self
.
portal
.
portal_contributions
.
newContent
(
file
=
makeFileUpload
(
'TEST-en-002-modified.doc'
))
self
.
tic
()
self
.
assertIn
(
'This document is modified'
,
document2
.
asText
())
self
.
assertEqual
(
(
document2
.
getReference
(),
document2
.
getLanguage
(),
document2
.
getVersion
()),
(
'TEST'
,
'en'
,
'002'
))
# document was updated in-place
self
.
assertEqual
(
document1
.
getRelativeUrl
(),
document2
.
getRelativeUrl
())
document2
.
manage_addLocalRoles
(
self
.
username
,
[
'Assignor'
])
document2
.
share
()
self
.
tic
()
# once the document is shared, the user can no longer edit it and trying to upload
# the same reference causes an error.
with
self
.
assertRaisesRegex
(
Unauthorized
,
"You are not allowed to update the existing document"
):
self
.
portal
.
portal_contributions
.
newContent
(
file
=
makeFileUpload
(
'TEST-en-002.doc'
))
# this also works with another user which can not see the document
another_user_id
=
self
.
id
()
uf
=
self
.
portal
.
acl_users
uf
.
_doAddUser
(
another_user_id
,
''
,
[
'Author'
],
[])
newSecurityManager
(
None
,
uf
.
getUserById
(
another_user_id
).
__of__
(
uf
))
with
self
.
assertRaisesRegex
(
Unauthorized
,
"You are not allowed to update the existing document"
):
self
.
portal
.
portal_contributions
.
newContent
(
file
=
makeFileUpload
(
'TEST-en-002.doc'
))
def
test_mergeRevision_with_node_reference_local_reference_filename_regular_expression
(
self
):
# this filename regular expression comes from configurator
self
.
getDefaultSystemPreference
().
setPreferredDocumentFilenameRegularExpression
(
"(?P<node_reference>[a-zA-Z0-9_-]+)-(?P<local_reference>[a-zA-Z0-9_.]+)-(?P<version>[0-9a-zA-Z.]+)-(?P<language>[a-z]{2})[^-]*?"
)
self
.
tic
()
document1
=
self
.
portal
.
portal_contributions
.
newContent
(
file
=
makeFileUpload
(
'TEST-en-002.doc'
,
as_name
=
'P-PROJ-TEST-002-en.doc'
))
self
.
tic
()
self
.
assertEqual
(
(
document1
.
getReference
(),
document1
.
getLanguage
(),
document1
.
getVersion
()),
(
'P-PROJ-TEST'
,
'en'
,
'002'
))
self
.
assertNotIn
(
'This document is modified'
,
document1
.
asText
())
document2
=
self
.
portal
.
portal_contributions
.
newContent
(
file
=
makeFileUpload
(
'TEST-en-002-modified.doc'
,
as_name
=
'P-PROJ-TEST-002-en.doc'
))
self
.
tic
()
self
.
assertIn
(
'This document is modified'
,
document2
.
asText
())
self
.
assertEqual
(
(
document2
.
getReference
(),
document2
.
getLanguage
(),
document2
.
getVersion
()),
(
'P-PROJ-TEST'
,
'en'
,
'002'
))
# document was updated in-place
self
.
assertEqual
(
document1
.
getRelativeUrl
(),
document2
.
getRelativeUrl
())
document2
.
manage_addLocalRoles
(
self
.
username
,
[
'Assignor'
])
document2
.
share
()
self
.
tic
()
# once the document is shared, the user can no longer edit it and trying to upload
# the same reference causes an error.
with
self
.
assertRaisesRegex
(
Unauthorized
,
"You are not allowed to update the existing document"
):
self
.
portal
.
portal_contributions
.
newContent
(
file
=
makeFileUpload
(
'TEST-en-002.doc'
,
as_name
=
'P-PROJ-TEST-002-en.doc'
))
# this also works with another user which can not see the document
another_user_id
=
self
.
id
()
uf
=
self
.
portal
.
acl_users
uf
.
_doAddUser
(
another_user_id
,
''
,
[
'Author'
],
[])
newSecurityManager
(
None
,
uf
.
getUserById
(
another_user_id
).
__of__
(
uf
))
with
self
.
assertRaisesRegex
(
Unauthorized
,
"You are not allowed to update the existing document"
):
self
.
portal
.
portal_contributions
.
newContent
(
file
=
makeFileUpload
(
'TEST-en-002.doc'
,
as_name
=
'P-PROJ-TEST-002-en.doc'
))
class
TestDocumentPerformance
(
TestDocumentMixin
):
class
TestDocumentPerformance
(
TestDocumentMixin
):
def
test_01_LargeOOoDocumentToImageConversion
(
self
):
def
test_01_LargeOOoDocumentToImageConversion
(
self
):
...
...
bt5/erp5_ingestion/SkinTemplateItem/portal_skins/erp5_ingestion/Base_contribute.py
View file @
9e2642a7
...
@@ -4,6 +4,8 @@
...
@@ -4,6 +4,8 @@
Use to contribute file to ERP5.
Use to contribute file to ERP5.
"""
"""
from
ZTUtils
import
make_query
from
ZTUtils
import
make_query
from
zExceptions
import
Unauthorized
from
Products.CMFCore.WorkflowCore
import
WorkflowException
portal
=
context
.
getPortalObject
()
portal
=
context
.
getPortalObject
()
translateString
=
portal
.
Base_translateString
translateString
=
portal
.
Base_translateString
...
@@ -53,38 +55,55 @@ if attach_document_to_context:
...
@@ -53,38 +55,55 @@ if attach_document_to_context:
document_kw
[
'follow_up_list'
]
=
follow_up_list
document_kw
[
'follow_up_list'
]
=
follow_up_list
document_kw
.
update
({
'discover_metadata'
:
not
synchronous_metadata_discovery
})
document_kw
.
update
({
'discover_metadata'
:
not
synchronous_metadata_discovery
})
if
url
is
not
None
:
try
:
# we contribute and URL, this happens entirely asynchronous
if
url
is
not
None
:
document
=
portal_contributions
.
newContentFromURL
(
url
=
url
,
\
# we contribute and URL, this happens entirely asynchronous
repeat
=
max_repeat
,
\
document
=
portal_contributions
.
newContentFromURL
(
url
=
url
,
\
batch_mode
=
batch_mode
,
\
repeat
=
max_repeat
,
\
**
document_kw
)
batch_mode
=
batch_mode
,
\
if
document
is
None
:
**
document_kw
)
# portal contributions could not upload it
if
document
is
None
:
if
cancel_url
is
not
None
:
# portal contributions could not upload it
# we can assume we can redirect
if
cancel_url
is
not
None
:
redirect_url
=
'%s?%s'
%
(
cancel_url
,
# we can assume we can redirect
make_query
(
dict
(
portal_status_message
=
translateString
(
"Wrong or not accessible URL address."
))))
redirect_url
=
'%s?%s'
%
(
cancel_url
,
return
context
.
REQUEST
.
RESPONSE
.
redirect
(
redirect_url
)
make_query
(
dict
(
portal_status_message
=
translateString
(
"Wrong or not accessible URL address."
))))
else
:
return
context
.
REQUEST
.
RESPONSE
.
redirect
(
redirect_url
)
# contribute file
else
:
document_kw
.
update
({
'file'
:
file
})
# contribute file
document
=
portal_contributions
.
newContent
(
**
document_kw
)
batch_mode
=
batch_mode
or
not
(
redirect_to_context
or
redirect_to_document
or
redirect_url
is
not
None
)
document_kw
.
update
({
'file'
:
file
})
document
=
portal_contributions
.
newContent
(
**
document_kw
)
is_existing_document_updated
=
False
is_existing_document_updated
=
False
if
synchronous_metadata_discovery
:
if
synchronous_metadata_discovery
:
# we need to do all synchronously, in other case portal_contributions will do
# we need to do all synchronously, in other case portal_contributions will do
# this in an activity
# this in an activity
if
document
.
isSupportBaseDataConversion
():
if
document
.
isSupportBaseDataConversion
():
document
.
processFile
()
document
.
processFile
()
filename
=
document
.
getFilename
()
filename
=
document
.
getFilename
()
merged_document
=
document
.
Document_convertToBaseFormatAndDiscoverMetadata
(
merged_document
=
document
.
Document_convertToBaseFormatAndDiscoverMetadata
(
filename
=
filename
,
filename
=
filename
,
user_login
=
user_login
,
user_login
=
user_login
,
input_parameter_dict
=
document_kw
)
input_parameter_dict
=
document_kw
)
is_existing_document_updated
=
(
merged_document
!=
document
)
is_existing_document_updated
=
(
merged_document
!=
document
)
document
=
merged_document
document
=
merged_document
except
(
Unauthorized
,
WorkflowException
)
as
e
:
if
batch_mode
:
raise
if
isinstance
(
e
,
WorkflowException
):
message
=
translateString
(
'You are not allowed to contribute document in that state.'
)
else
:
if
'You are not allowed to update the existing document'
not
in
str
(
e
):
raise
message
=
translateString
(
'You are not allowed to update the existing document which has the same coordinates.'
)
return
context
.
Base_redirect
(
'view'
,
abort_transaction
=
True
,
keep_items
=
{
'portal_status_message'
:
message
,
'portal_status_level'
:
'error'
,
})
document_portal_type
=
document
.
getTranslatedPortalType
()
document_portal_type
=
document
.
getTranslatedPortalType
()
if
not
is_existing_document_updated
:
if
not
is_existing_document_updated
:
...
@@ -94,7 +113,7 @@ else:
...
@@ -94,7 +113,7 @@ else:
message
=
translateString
(
'${portal_type} updated successfully.'
,
message
=
translateString
(
'${portal_type} updated successfully.'
,
mapping
=
dict
(
portal_type
=
document_portal_type
))
mapping
=
dict
(
portal_type
=
document_portal_type
))
if
redirect_to_context
or
redirect_to_document
or
redirect_url
is
not
Non
e
:
if
not
batch_mod
e
:
# this is an UI mode where script should handle HTTP redirects and is likely used
# this is an UI mode where script should handle HTTP redirects and is likely used
# by ERP5 form
# by ERP5 form
if
redirect_to_document
and
document
is
not
None
:
if
redirect_to_document
and
document
is
not
None
:
...
...
product/ERP5/bootstrap/erp5_core/MixinTemplateItem/portal_components/mixin.erp5.DiscoverableMixin.py
View file @
9e2642a7
...
@@ -120,24 +120,10 @@ class DiscoverableMixin(CachedConvertableMixin):
...
@@ -120,24 +120,10 @@ class DiscoverableMixin(CachedConvertableMixin):
return
{}
return
{}
### Metadata disovery and ingestion methods
### Metadata disovery and ingestion methods
security
.
declareProtected
(
Permissions
.
ModifyPortalContent
,
'discoverMetadata'
)
@
UnrestrictedMethod
@
UnrestrictedMethod
def
discoverMetadata
(
self
,
filename
=
None
,
user_login
=
None
,
def
_unrestrictedDiscoverMetadata
(
input_parameter_dict
=
None
):
self
,
filename
=
None
,
user_login
=
None
,
input_parameter_dict
=
None
):
"""
"""Unrestricted part of metadata discovery"""
This is the main metadata discovery function - controls the process
of discovering data from various sources. The discovery itself is
delegated to scripts or uses preference-configurable regexps. The
method returns either self or the document which has been
merged in the discovery process.
filename - this parameter is a file name of the form "AA-BBB-CCC-223-en"
user_login - this is a login string of a person; can be None if the user is
currently logged in, then we'll get him from session
input_parameter_dict - arguments provided to Create this content by user.
"""
if
input_parameter_dict
is
None
:
if
input_parameter_dict
is
None
:
input_parameter_dict
=
{}
input_parameter_dict
=
{}
# Preference is made of a sequence of 'user_login', 'content', 'filename', 'input'
# Preference is made of a sequence of 'user_login', 'content', 'filename', 'input'
...
@@ -189,6 +175,31 @@ class DiscoverableMixin(CachedConvertableMixin):
...
@@ -189,6 +175,31 @@ class DiscoverableMixin(CachedConvertableMixin):
if
portal_type
!=
self
.
getPortalType
():
if
portal_type
!=
self
.
getPortalType
():
return
self
.
migratePortalType
(
portal_type
)
return
self
.
migratePortalType
(
portal_type
)
security
.
declareProtected
(
Permissions
.
ModifyPortalContent
,
'discoverMetadata'
)
def
discoverMetadata
(
self
,
filename
=
None
,
user_login
=
None
,
input_parameter_dict
=
None
):
"""
This is the main metadata discovery function - controls the process
of discovering data from various sources. The discovery itself is
delegated to scripts or uses preference-configurable regexps. The
method returns either self or the document which has been
merged in the discovery process.
filename - this parameter is a file name of the form "AA-BBB-CCC-223-en"
user_login - this is a login string of a person; can be None if the user is
currently logged in, then we'll get him from session
input_parameter_dict - arguments provided to Create this content by user.
"""
discovered_document
=
self
.
_unrestrictedDiscoverMetadata
(
filename
=
filename
,
user_login
=
user_login
,
input_parameter_dict
=
input_parameter_dict
)
if
discovered_document
is
not
None
:
return
discovered_document
if
input_parameter_dict
is
None
:
input_parameter_dict
=
{}
def
maybeChangeState
(
document
):
def
maybeChangeState
(
document
):
publication_state
=
input_parameter_dict
.
get
(
'publication_state'
)
publication_state
=
input_parameter_dict
.
get
(
'publication_state'
)
if
publication_state
and
document
.
getValidationState
()
==
'draft'
:
if
publication_state
and
document
.
getValidationState
()
==
'draft'
:
...
...
product/ERP5/bootstrap/erp5_core/ToolComponentTemplateItem/portal_components/tool.erp5.ContributionTool.py
View file @
9e2642a7
...
@@ -223,26 +223,27 @@ class ContributionTool(BaseTool):
...
@@ -223,26 +223,27 @@ class ContributionTool(BaseTool):
return
response
.
redirect
(
self
.
absolute_url
())
return
response
.
redirect
(
self
.
absolute_url
())
return
document
return
document
#
# Check if same file is already exists. if it exists, then update it.
# Check if same file is already exists. if it exists, then update it.
#
# If we use discover metadata API, this will be during discoverMetadata and
property_dict
=
self
.
getMatchedFilenamePatternDict
(
filename
)
# mergeRevision.
reference
=
property_dict
.
get
(
'reference'
,
None
)
if
discover_metadata
:
version
=
property_dict
.
get
(
'version'
,
None
)
property_dict
=
self
.
getPropertyDictFromFilename
(
filename
)
language
=
property_dict
.
get
(
'language'
,
None
)
reference
=
property_dict
.
get
(
'reference'
,
None
)
if
portal_type
and
reference
and
version
and
language
:
version
=
property_dict
.
get
(
'version'
,
None
)
portal_catalog
=
portal
.
portal_catalog
language
=
property_dict
.
get
(
'language'
,
None
)
document
=
portal_catalog
.
getResultValue
(
portal_type
=
portal_type
,
if
portal_type
and
reference
and
version
and
language
:
reference
=
reference
,
document
=
portal
.
portal_catalog
.
unrestrictedGetResultValue
(
version
=
version
,
portal_type
=
portal_type
,
language
=
language
)
reference
=
reference
,
version
=
version
,
if
document
is
not
None
:
language
=
language
)
# document is already uploaded. So overrides file.
if
not
_checkPermission
(
Permissions
.
ModifyPortalContent
,
document
):
if
document
is
not
None
:
raise
Unauthorized
(
"[DMS] You are not allowed to update the existing document which has the same coordinates (id %s)"
%
document
.
getId
())
# document is already uploaded. So overrides file.
document
.
edit
(
file
=
kw
[
'file'
])
if
not
_checkPermission
(
Permissions
.
ModifyPortalContent
,
document
):
return
document
raise
Unauthorized
(
"[DMS] You are not allowed to update the existing document which has the same coordinates (id %s)"
%
document
.
getId
())
document
.
edit
(
file
=
kw
[
'file'
])
return
document
# Temp objects use the standard newContent from Folder
# Temp objects use the standard newContent from Folder
if
temp_object
:
if
temp_object
:
# For temp_object creation, use the standard method
# For temp_object creation, use the standard method
...
...
product/ERP5OOo/tests/testIngestion.py
View file @
9e2642a7
...
@@ -29,22 +29,25 @@
...
@@ -29,22 +29,25 @@
#
#
##############################################################################
##############################################################################
import
io
import
unittest
import
unittest
import
os
import
os
import
StringIO
from
cgi
import
FieldStorage
from
cgi
import
FieldStorage
from
lxml
import
etree
from
lxml
import
etree
from
AccessControl.SecurityManagement
import
newSecurityManager
from
AccessControl.SecurityManagement
import
newSecurityManager
from
AccessControl
import
Unauthorized
from
DateTime
import
DateTime
from
DateTime
import
DateTime
from
Products.ERP5Type.Utils
import
convertToUpperCase
from
Products.ERP5Type.Utils
import
convertToUpperCase
from
Products.ERP5Type.tests.ERP5TypeTestCase
import
(
from
Products.ERP5Type.tests.ERP5TypeTestCase
import
(
ERP5TypeTestCase
,
_getConversionServerUrlList
)
ERP5TypeTestCase
,
_getConversionServerUrlList
)
from
Products.CMFCore.WorkflowCore
import
WorkflowException
from
Products.ERP5Type.tests.Sequence
import
SequenceList
from
Products.ERP5Type.tests.Sequence
import
SequenceList
from
Products.ERP5Type.tests.utils
import
FileUpload
,
removeZODBPythonScript
,
\
from
Products.ERP5Type.tests.utils
import
FileUpload
,
removeZODBPythonScript
,
\
createZODBPythonScript
createZODBPythonScript
from
Products.ERP5OOo.OOoUtils
import
OOoBuilder
from
Products.ERP5OOo.OOoUtils
import
OOoBuilder
from
Products.CMFCore.utils
import
getToolByName
from
Products.CMFCore.utils
import
getToolByName
from
zExceptions
import
BadRequest
from
zExceptions
import
BadRequest
from
zExceptions
import
Redirect
import
ZPublisher.HTTPRequest
import
ZPublisher.HTTPRequest
from
unittest
import
expectedFailure
from
unittest
import
expectedFailure
import
urllib
import
urllib
...
@@ -78,6 +81,15 @@ class IngestionTestCase(ERP5TypeTestCase):
...
@@ -78,6 +81,15 @@ class IngestionTestCase(ERP5TypeTestCase):
'erp5_ingestion'
,
'erp5_ingestion_mysql_innodb_catalog'
,
'erp5_ingestion'
,
'erp5_ingestion_mysql_innodb_catalog'
,
'erp5_web'
,
'erp5_crm'
,
'erp5_dms'
)
'erp5_web'
,
'erp5_crm'
,
'erp5_dms'
)
def
afterSetUp
(
self
):
self
.
setSystemPreference
()
def
setSystemPreference
(
self
):
default_pref
=
self
.
getDefaultSystemPreference
()
default_pref
.
setPreferredRedirectToDocument
(
False
)
default_pref
.
setPreferredDocumentFilenameRegularExpression
(
FILENAME_REGULAR_EXPRESSION
)
default_pref
.
setPreferredDocumentReferenceRegularExpression
(
REFERENCE_REGULAR_EXPRESSION
)
def
beforeTearDown
(
self
):
def
beforeTearDown
(
self
):
# cleanup modules
# cleanup modules
module_id_list
=
"""web_page_module
module_id_list
=
"""web_page_module
...
@@ -136,13 +148,8 @@ class TestIngestion(IngestionTestCase):
...
@@ -136,13 +148,8 @@ class TestIngestion(IngestionTestCase):
self
.
portal_categories
=
self
.
getCategoryTool
()
self
.
portal_categories
=
self
.
getCategoryTool
()
self
.
portal_catalog
=
self
.
getCatalogTool
()
self
.
portal_catalog
=
self
.
getCatalogTool
()
self
.
createDefaultCategoryList
()
self
.
createDefaultCategoryList
()
self
.
setSystemPreference
()
self
.
setSimulatedNotificationScript
()
self
.
setSimulatedNotificationScript
()
super
(
TestIngestion
,
self
).
afterSetUp
()
def
setSystemPreference
(
self
):
default_pref
=
self
.
getDefaultSystemPreference
()
default_pref
.
setPreferredDocumentFilenameRegularExpression
(
FILENAME_REGULAR_EXPRESSION
)
default_pref
.
setPreferredDocumentReferenceRegularExpression
(
REFERENCE_REGULAR_EXPRESSION
)
def
setSimulatedNotificationScript
(
self
,
sequence
=
None
,
sequence_list
=
None
,
**
kw
):
def
setSimulatedNotificationScript
(
self
,
sequence
=
None
,
sequence_list
=
None
,
**
kw
):
"""
"""
...
@@ -2108,7 +2115,7 @@ class Base_contributeMixin:
...
@@ -2108,7 +2115,7 @@ class Base_contributeMixin:
"""
"""
person
=
self
.
portal
.
person_module
.
newContent
(
portal_type
=
'Person'
)
person
=
self
.
portal
.
person_module
.
newContent
(
portal_type
=
'Person'
)
empty_file_upload
=
ZPublisher
.
HTTPRequest
.
FileUpload
(
FieldStorage
(
empty_file_upload
=
ZPublisher
.
HTTPRequest
.
FileUpload
(
FieldStorage
(
fp
=
StringIO
.
String
IO
(),
fp
=
io
.
Bytes
IO
(),
environ
=
dict
(
REQUEST_METHOD
=
'PUT'
),
environ
=
dict
(
REQUEST_METHOD
=
'PUT'
),
headers
=
{
"content-disposition"
:
headers
=
{
"content-disposition"
:
"attachment; filename=empty;"
}))
"attachment; filename=empty;"
}))
...
@@ -2266,3 +2273,120 @@ class TestBase_contributeWithSecurity(IngestionTestCase, Base_contributeMixin):
...
@@ -2266,3 +2273,120 @@ class TestBase_contributeWithSecurity(IngestionTestCase, Base_contributeMixin):
uf
.
_doAddUser
(
self
.
id
(),
self
.
newPassword
(),
[
'Associate'
,
'Assignor'
,
'Author'
],
[])
uf
.
_doAddUser
(
self
.
id
(),
self
.
newPassword
(),
[
'Associate'
,
'Assignor'
,
'Author'
],
[])
user
=
uf
.
getUserById
(
self
.
id
()).
__of__
(
uf
)
user
=
uf
.
getUserById
(
self
.
id
()).
__of__
(
uf
)
newSecurityManager
(
None
,
user
)
newSecurityManager
(
None
,
user
)
def
test_Base_contribute_mergeRevision
(
self
):
person
=
self
.
portal
.
person_module
.
newContent
(
portal_type
=
'Person'
)
ret
=
person
.
Base_contribute
(
redirect_to_context
=
True
,
synchronous_metadata_discovery
=
True
,
file
=
makeFileUpload
(
'TEST-en-002.pdf'
))
self
.
assertIn
(
(
'portal_status_message'
,
'PDF created successfully.'
),
urlparse
.
parse_qsl
(
urlparse
.
urlparse
(
ret
).
query
))
document
,
=
self
.
portal
.
document_module
.
contentValues
()
self
.
assertEqual
(
(
document
.
getReference
(),
document
.
getLanguage
(),
document
.
getVersion
()),
(
'TEST'
,
'en'
,
'002'
))
self
.
assertEqual
(
document
.
getValidationState
(),
'draft'
)
document
.
setData
(
b''
)
self
.
tic
()
# when updating, the message is different
ret
=
person
.
Base_contribute
(
redirect_to_context
=
True
,
synchronous_metadata_discovery
=
True
,
file
=
makeFileUpload
(
'TEST-en-002.pdf'
))
self
.
assertIn
(
(
'portal_status_message'
,
'PDF updated successfully.'
),
urlparse
.
parse_qsl
(
urlparse
.
urlparse
(
ret
).
query
))
document
,
=
self
.
portal
.
document_module
.
contentValues
()
self
.
assertEqual
(
(
document
.
getReference
(),
document
.
getLanguage
(),
document
.
getVersion
()),
(
'TEST'
,
'en'
,
'002'
))
self
.
assertEqual
(
document
.
getValidationState
(),
'draft'
)
self
.
assertIn
(
b'%PDF'
,
document
.
getData
())
# change to a state where user can not edit the document
document
.
setData
(
b''
)
document
.
share
()
self
.
tic
()
for
synchronous_metadata_discovery
in
True
,
False
:
with
self
.
assertRaises
(
Redirect
)
as
ctx
:
person
.
Base_contribute
(
redirect_to_context
=
True
,
synchronous_metadata_discovery
=
synchronous_metadata_discovery
,
file
=
makeFileUpload
(
'TEST-en-002.pdf'
))
self
.
assertIn
(
(
'portal_status_message'
,
'You are not allowed to update the existing document which has the same coordinates.'
),
urlparse
.
parse_qsl
(
urlparse
.
urlparse
(
str
(
ctx
.
exception
)).
query
))
self
.
assertIn
(
(
'portal_status_level'
,
'error'
),
urlparse
.
parse_qsl
(
urlparse
.
urlparse
(
str
(
ctx
.
exception
)).
query
))
# document is not updated
self
.
assertEqual
(
document
.
getData
(),
b''
)
# when using the script directly it's an error
with
self
.
assertRaisesRegex
(
Unauthorized
,
"You are not allowed to update the existing document which has the same coordinates"
):
person
.
Base_contribute
(
synchronous_metadata_discovery
=
synchronous_metadata_discovery
,
file
=
makeFileUpload
(
'TEST-en-002.pdf'
))
self
.
assertEqual
(
document
.
getData
(),
b''
)
def
test_Base_contribute_publication_state_unauthorized
(
self
):
# When user is not allowed to publish a document, they can not use publication_state="published"
# option of Base_contribute
user_id
=
self
.
id
()
+
'-author'
uf
=
self
.
portal
.
acl_users
uf
.
_doAddUser
(
user_id
,
self
.
newPassword
(),
[
'Author'
],
[])
user
=
uf
.
getUserById
(
user_id
).
__of__
(
uf
)
newSecurityManager
(
None
,
user
)
person
=
self
.
portal
.
person_module
.
newContent
(
portal_type
=
'Person'
)
# with redirect_to_context option (like in the UI Dialog), we have a nice
# status message
with
self
.
assertRaises
(
Redirect
)
as
ctx
:
person
.
Base_contribute
(
publication_state
=
'published'
,
redirect_to_context
=
True
,
synchronous_metadata_discovery
=
True
,
file
=
makeFileUpload
(
'TEST-en-002.pdf'
))
self
.
assertIn
(
(
'portal_status_message'
,
'You are not allowed to contribute document in that state.'
),
urlparse
.
parse_qsl
(
urlparse
.
urlparse
(
str
(
ctx
.
exception
)).
query
))
self
.
assertIn
(
(
'portal_status_level'
,
'error'
),
urlparse
.
parse_qsl
(
urlparse
.
urlparse
(
str
(
ctx
.
exception
)).
query
))
# when using the script directly it's an error
with
self
.
assertRaisesRegex
(
WorkflowException
,
"Transition document_publication_workflow/publish unsupported"
):
person
.
Base_contribute
(
publication_state
=
'published'
,
synchronous_metadata_discovery
=
True
,
file
=
makeFileUpload
(
'TEST-en-002.pdf'
))
# when using asynchronous metadata discovery, an error occurs in activity,
# but not document is published
person
.
Base_contribute
(
publication_state
=
'published'
,
redirect_to_context
=
True
,
synchronous_metadata_discovery
=
False
,
file
=
makeFileUpload
(
'TEST-en-002.pdf'
))
with
self
.
assertRaisesRegex
(
Exception
,
"Transition document_publication_workflow/publish unsupported"
):
self
.
tic
()
self
.
assertEqual
(
{
doc
.
getValidationState
()
for
doc
in
self
.
portal
.
document_module
.
contentValues
()},
set
([
'draft'
]))
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment