Commit ad0fbcdf authored by Nicolas Wavrant's avatar Nicolas Wavrant

TrashTool: add function to restore objects

parent 707eaed2
...@@ -355,6 +355,32 @@ class TestTrashTool(ERP5TypeTestCase): ...@@ -355,6 +355,32 @@ class TestTrashTool(ERP5TypeTestCase):
self.assertTrue(subcat is not None) self.assertTrue(subcat is not None)
sequence.edit(subcat_path=subcat.getPath()) sequence.edit(subcat_path=subcat.getPath())
def stepDeleteBaseCategory(self, sequence=None, sequence_list=None, **kw):
pc = self.getCategoryTool()
pc.manage_delObjects(ids=[sequence.get('bc_id')])
def stepRestore(self, sequence=None, sequence_list=None, **kw):
trash_id = sequence.get('trash_id')
trash = self.getTrashTool()
trashbin = trash._getOb(trash_id, None)
bc_id = sequence.get('bc_id')
trash.restoreObject(trashbin, ['portal_categories_items'], bc_id)
def stepCheckRestore(self, sequence=None, sequence_list=None, **kw):
bc_id = sequence.get('bc_id')
bc = self.portal.portal_categories[bc_id]
self.assertTrue(
sorted(bc.objectIds()) == sorted(sequence.get('category_id_list'))
)
self.assertEqual(
len(
self.portal.portal_catalog(
portal_type='Category',
parent_uid=bc.getUid()
)
), 10
)
# tests # tests
def test_01_checkTrashBinCreation(self, quiet=quiet, run=run_all_test): def test_01_checkTrashBinCreation(self, quiet=quiet, run=run_all_test):
if not run: return if not run: return
...@@ -477,6 +503,30 @@ class TestTrashTool(ERP5TypeTestCase): ...@@ -477,6 +503,30 @@ class TestTrashTool(ERP5TypeTestCase):
sequence_list.addSequenceString(sequence_string) sequence_list.addSequenceString(sequence_string)
sequence_list.play(self, quiet=quiet) sequence_list.play(self, quiet=quiet)
def test_07_checkRestore(self, quiet=quiet, run=run_all_test):
if not run: return
if not quiet:
message = 'Test Check Backup Without Subobjects'
ZopeTestCase._print('\n%s ' % message)
LOG('Testing... ', 0, message)
sequence_list = SequenceList()
sequence_string = '\
CheckTrashToolExists \
CreateTrashBin \
AddBaseCategory \
AddCategories \
Tic \
BackupObjectsWithKeepingSubobjects \
Tic \
CheckObjectBackupWithSubObjects \
DeleteBaseCategory \
Tic \
Restore \
Tic \
CheckRestore \
'
sequence_list.addSequenceString(sequence_string)
sequence_list.play(self, quiet=quiet)
def test_suite(): def test_suite():
......
...@@ -189,6 +189,76 @@ class TrashTool(BaseTool): ...@@ -189,6 +189,76 @@ class TrashTool(BaseTool):
) )
return trashbin return trashbin
security.declarePrivate('restoreObject')
def restoreObject(self, trashbin, container_path, object_id):
"""
Restore an object from the trash bin (copy it under portal)
"""
portal = self.getPortalObject()
# recreate path of the backup object if necessary
backup_object_container = portal
for path in container_path:
if path.endswith('_items'):
path = path[0:-len('_items')]
if path not in backup_object_container.objectIds():
if not hasattr(aq_base(backup_object_container), "newContent"):
backup_object_container.manage_addFolder(id=path,)
backup_object_container = backup_object_container._getOb(path)
else:
backup_object_container = backup_object_container.newContent(
portal_type='Folder',
id=path,
)
else:
backup_object_container = backup_object_container._getOb(path)
# backup the object
# here we choose export/import to copy because cut/paste
# do too many things and check for what we want to do
object_path = container_path + [object_id]
obj = trashbin.restrictedTraverse(object_path, None)
if obj is not None:
connection = obj._p_jar
o = obj
while connection is None:
o = o.aq_parent
connection=o._p_jar
if obj._p_oid is None:
LOG("Trash Tool backupObject", WARNING,
"Trying to backup uncommitted object %s" % object_path)
return {}
if isinstance(obj, Broken):
LOG("Trash Tool backupObject", WARNING,
"Can't backup broken object %s" % object_path)
klass = obj.__class__
if klass.__module__[:27] in ('Products.ERP5Type.Document.',
'erp5.portal_type'):
# meta_type is required so that a broken object
# can be removed properly from a BTreeFolder2
# (unfortunately, we can only guess it)
klass.meta_type = 'ERP5' + re.subn('(?=[A-Z])', ' ',
klass.__name__)[0]
return
copy = connection.exportFile(obj._p_oid)
# import object in trash
connection = backup_object_container._p_jar
o = backup_object_container
while connection is None:
o = o.aq_parent
connection=o._p_jar
copy.seek(0)
try:
backup = connection.importFile(copy)
if hasattr(aq_base(backup), 'isIndexable'):
del backup.isIndexable
backup_object_container._setObject(object_id, backup)
except (AttributeError, ImportError):
# XXX we can go here due to formulator because attribute
# field_added doesn't not exists on parent if it is a Trash
# Folder and not a Form, or a module for the old object is
# already removed, and we cannot backup the object
LOG("Trash Tool backupObject", WARNING,
"Can't backup object %s" % object_path)
security.declareProtected(Permissions.ManagePortal, 'getTrashBinObjectsList') security.declareProtected(Permissions.ManagePortal, 'getTrashBinObjectsList')
def getTrashBinObjectsList(self, trashbin): def getTrashBinObjectsList(self, trashbin):
""" """
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment