Commit 6838c011 authored by Bram Schoenmakers's avatar Bram Schoenmakers

Merge pull request #47 from mruwek/undo

Introduce backup feature and `revert` command
parents 95861af2 7bf730a9
# Topydo - A todo.txt client written in Python.
# Copyright (C) 2014 - 2015 Bram Schoenmakers <me@bramschoenmakers.nl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import unittest
from datetime import date
from glob import glob
from six import u
from uuid import uuid4
from test.CommandTestCase import CommandTest
from topydo.commands.AddCommand import AddCommand
from topydo.commands.ArchiveCommand import ArchiveCommand
from topydo.commands.DeleteCommand import DeleteCommand
from topydo.commands.DoCommand import DoCommand
from topydo.commands.RevertCommand import RevertCommand
from topydo.lib.ChangeSet import ChangeSet
from topydo.lib.Config import config
from topydo.lib.TodoFile import TodoFile
from topydo.lib.TodoList import TodoList
class RevertCommandTest(CommandTest):
def setUp(self):
super(RevertCommandTest, self).setUp()
todos = [
"Foo",
"Bar",
"Baz",
]
self.todolist = TodoList(todos)
self.today = date.today()
self.tmp_name = str(uuid4().hex.upper()[0:6])
archive_filename = '/tmp/' + self.tmp_name + '_archive'
todo_filename = '/tmp/' + self.tmp_name + '_todo'
config(p_overrides={('topydo', 'archive_filename'): archive_filename,
('topydo', 'filename'): todo_filename, ('topydo', 'backup_count'): '5'})
self.archive_file = TodoFile(archive_filename)
self.archive = TodoList([])
def test_revert01(self):
backup = ChangeSet(p_call=['do 1'])
backup.add_todolist(self.todolist)
backup.add_archive(self.archive)
backup.timestamp = '1'
command = DoCommand(["1"], self.todolist, self.out, self.error, None)
command.execute()
archive_command = ArchiveCommand(self.todolist, self.archive)
archive_command.execute()
self.archive_file.write(self.archive.print_todos())
backup.save(self.todolist)
self.assertEqual(self.archive.print_todos(), "x {} Foo".format(self.today))
self.assertEqual(self.todolist.print_todos(), "Bar\nBaz")
revert_command = RevertCommand([], self.todolist, self.out, self.error, None)
revert_command.execute()
result = TodoList(self.archive_file.read()).print_todos()
self.assertEqual(self.errors, "")
self.assertTrue(self.output.endswith("Successfully reverted: do 1\n"))
self.assertEqual(result, "")
self.assertEqual(self.todolist.print_todos(), "Foo\nBar\nBaz")
def test_revert02(self):
backup = ChangeSet(self.todolist, self.archive, ['do 1'])
backup.timestamp = '1'
command1 = DoCommand(["1"], self.todolist, self.out, self.error, None)
command1.execute()
archive_command1 = ArchiveCommand(self.todolist, self.archive)
archive_command1.execute()
self.archive_file.write(self.archive.print_todos())
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['do Bar'])
backup.timestamp = '2'
command2 = DoCommand(["Bar"], self.todolist, self.out, self.error, None)
command2.execute()
archive_command2 = ArchiveCommand(self.todolist, self.archive)
archive_command2.execute()
self.archive_file.write(self.archive.print_todos())
backup.save(self.todolist)
self.assertEqual(self.archive.print_todos(), "x {t} Foo\nx {t} Bar".format(t=self.today))
self.assertEqual(self.todolist.print_todos(), "Baz")
revert_command = RevertCommand([], self.todolist, self.out, self.error, None)
revert_command.execute()
result = TodoList(self.archive_file.read()).print_todos()
self.assertEqual(self.errors, "")
self.assertTrue(self.output.endswith("Successfully reverted: do Bar\n"))
self.assertEqual(result, "x {} Foo".format(self.today))
self.assertEqual(self.todolist.print_todos(), "Bar\nBaz")
def test_revert03(self):
""" Test behavior when no backup is found """
command = RevertCommand([], self.todolist, self.out, self.error)
command.execute()
self.assertEqual(self.errors, "No backup was found for the current state of {}\n".format(config().todotxt()))
def test_revert04(self):
""" Test trimming of the backup_file """
backup = ChangeSet(self.todolist, self.archive, ['add One'])
backup.timestamp = '1'
command1 = AddCommand(["One"], self.todolist, self.out, self.error, None)
command1.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['add Two'])
backup.timestamp = '2'
command2 = AddCommand(["Two"], self.todolist, self.out, self.error, None)
command2.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['add Three'])
backup.timestamp = '3'
command3 = AddCommand(["Three"], self.todolist, self.out, self.error, None)
command3.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['add Four'])
backup.timestamp = '4'
command4 = AddCommand(["Four"], self.todolist, self.out, self.error, None)
command4.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['add Five'])
backup.timestamp = '5'
command5 = AddCommand(["Five"], self.todolist, self.out, self.error, None)
command5.execute()
backup.save(self.todolist)
result = len(ChangeSet().backup_dict.keys())
self.assertEqual(result, 6)
backup = ChangeSet(self.todolist, self.archive, ['add Six'])
backup.timestamp = '6'
command6 = AddCommand(["Six"], self.todolist, self.out, self.error, None)
command6.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['add Seven'])
backup.timestamp = '7'
command7 = AddCommand(["Seven"], self.todolist, self.out, self.error, None)
command7.execute()
backup.save(self.todolist)
result = len(ChangeSet().backup_dict.keys())
self.assertEqual(result, 6)
revert_command = RevertCommand([], self.todolist, self.out, self.error, None)
revert_command.execute()
backup = ChangeSet()
changesets = list(backup.backup_dict.keys())
changesets.remove('index')
index_timestamps = [change[0] for change in backup._get_index()]
result = list(set(index_timestamps) - set(changesets))
self.assertEqual(len(changesets), 4)
self.assertEqual(result, [])
self.assertEqual(self.errors, "")
self.assertTrue(self.output.endswith("Successfully reverted: add Seven\n"))
def test_revert05(self):
""" Test for possible backup collisions """
backup = ChangeSet(self.todolist, self.archive, ['add One'])
backup.timestamp = '1'
command1 = AddCommand(["One"], self.todolist, self.out, self.error, None)
command1.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['add Two'])
backup.timestamp = '2'
command2 = AddCommand(["Two"], self.todolist, self.out, self.error, None)
command2.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['add Three'])
backup.timestamp = '3'
command3 = AddCommand(["Three"], self.todolist, self.out, self.error, None)
command3.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['delete Three'])
backup.timestamp = '4'
command4 = DeleteCommand(["Three"], self.todolist, self.out, self.error, None)
command4.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['add Four'])
backup.timestamp = '5'
command4 = AddCommand(["Four"], self.todolist, self.out, self.error, None)
command4.execute()
backup.save(self.todolist)
revert_command = RevertCommand([], self.todolist, self.out, self.error, None)
revert_command.execute()
self.assertEqual(self.errors, "")
self.assertTrue(self.output.endswith("Successfully reverted: add Four\n"))
self.assertEqual(self.todolist.print_todos(), "Foo\nBar\nBaz\n{t} One\n{t} Two".format(t=self.today))
revert_command = RevertCommand([], self.todolist, self.out, self.error, None)
revert_command.execute()
self.assertEqual(self.errors, "")
self.assertTrue(self.output.endswith("Successfully reverted: delete Three\n"))
self.assertEqual(self.todolist.print_todos(), "Foo\nBar\nBaz\n{t} One\n{t} Two\n{t} Three".format(t=self.today))
revert_command = RevertCommand([], self.todolist, self.out, self.error, None)
revert_command.execute()
self.assertEqual(self.errors, "")
self.assertTrue(self.output.endswith("Successfully reverted: add Three\n"))
self.assertEqual(self.todolist.print_todos(), "Foo\nBar\nBaz\n{t} One\n{t} Two".format(t=self.today))
revert_command = RevertCommand([], self.todolist, self.out, self.error, None)
revert_command.execute()
self.assertEqual(self.errors, "")
self.assertTrue(self.output.endswith("Successfully reverted: add Two\n"))
self.assertEqual(self.todolist.print_todos(), "Foo\nBar\nBaz\n{t} One".format(t=self.today))
revert_command = RevertCommand([], self.todolist, self.out, self.error, None)
revert_command.execute()
self.assertEqual(self.errors, "")
self.assertTrue(self.output.endswith("Successfully reverted: add One\n"))
self.assertEqual(self.todolist.print_todos(), "Foo\nBar\nBaz")
def test_revert06(self):
""" Test attempt of deletion with non-existing backup key"""
backup = ChangeSet(self.todolist, self.archive, ['add One'])
backup.timestamp = '1'
command1 = AddCommand(["One"], self.todolist, self.out, self.error, None)
command1.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['add Two'])
backup.timestamp = '2'
command2 = AddCommand(["Two"], self.todolist, self.out, self.error, None)
command2.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['add Three'])
backup.timestamp = '3'
command3 = AddCommand(["Three"], self.todolist, self.out, self.error, None)
command3.execute()
backup.save(self.todolist)
backup = ChangeSet(self.todolist, self.archive, ['delete Three'])
backup.timestamp = '4'
command4 = DeleteCommand(["Three"], self.todolist, self.out, self.error, None)
command4.execute()
backup.save(self.todolist)
backup = ChangeSet()
backup.delete('Foo')
changesets = list(backup.backup_dict.keys())
changesets.remove('index')
index_timestamps = [change[0] for change in backup._get_index()]
result = list(set(index_timestamps) - set(changesets))
self.assertEqual(len(changesets), 4)
self.assertEqual(result, [])
self.assertEqual(self.errors, "")
def test_backup_config01(self):
config(p_overrides={('topydo', 'backup_count'): '1'})
self.assertEqual(config().backup_count(), 1)
def test_backup_config02(self):
config(p_overrides={('topydo', 'backup_count'): '0'})
self.assertEqual(config().backup_count(), 0)
def test_backup_config03(self):
config(p_overrides={('topydo', 'backup_count'): '-88'})
self.assertEqual(config().backup_count(), 0)
def test_backup_config04(self):
config(p_overrides={('topydo', 'backup_count'): 'foo'})
self.assertEqual(config().backup_count(), 5)
def test_help(self):
command = RevertCommand(["help"], self.todolist, self.out, self.error)
command.execute()
self.assertEqual(self.output, "")
self.assertEqual(self.errors, command.usage() + "\n\n" + command.help() + "\n")
def tearDown(self):
for filename in glob('/tmp/' + self.tmp_name + '*'):
os.remove(filename)
if __name__ == '__main__':
unittest.main()
......@@ -6,6 +6,7 @@ default_command = ls
; archive_filename = done.txt
colors = 1
identifiers = linenumber ; or: text
backup_count = 5
[add]
auto_creation_date = 1
......
......@@ -47,6 +47,7 @@ _SUBCOMMAND_MAP = {
'postpone': 'PostponeCommand',
'pri': 'PriorityCommand',
'quit': 'ExitCommand',
'revert': 'RevertCommand',
'rm': 'DeleteCommand',
'sort': 'SortCommand',
'tag': 'TagCommand',
......
......@@ -25,6 +25,7 @@ from six import PY2
from six.moves import input
MAIN_OPTS = "ac:d:ht:v"
READ_ONLY_COMMANDS = ('List', 'ListContext', 'ListProject')
def usage():
......@@ -56,6 +57,7 @@ Available commands:
* listprojects (lsprj)
* postpone
* pri
* revert
* sort
* tag
......@@ -104,6 +106,7 @@ from topydo.commands.SortCommand import SortCommand
from topydo.lib import TodoFile
from topydo.lib import TodoList
from topydo.lib import TodoListBase
from topydo.lib.ChangeSet import ChangeSet
from topydo.lib.Utils import escape_ansi
......@@ -119,6 +122,7 @@ class CLIApplicationBase(object):
self.todolist = TodoList.TodoList([])
self.todofile = None
self.do_archive = True
self.backup = None
def _usage(self):
usage()
......@@ -170,6 +174,9 @@ class CLIApplicationBase(object):
archive_file = TodoFile.TodoFile(config().archive())
archive = TodoListBase.TodoListBase(archive_file.read())
if self.backup:
self.backup.add_archive(archive)
if archive:
command = ArchiveCommand(self.todolist, archive)
command.execute()
......@@ -194,6 +201,11 @@ class CLIApplicationBase(object):
Execute a subcommand with arguments. p_command is a class (not an
object).
"""
cmds_wo_backup = tuple(cmd + 'Command' for cmd in ('Revert', ) + READ_ONLY_COMMANDS)
if config().backup_count() > 0 and p_command and not p_command.__module__.endswith(cmds_wo_backup):
call = [p_command.__module__.lower()[16:-7]] + p_args # strip "topydo.commands" and "Command"
self.backup = ChangeSet(self.todolist, p_call=call)
command = p_command(
p_args,
self.todolist,
......@@ -222,7 +234,12 @@ class CLIApplicationBase(object):
if config().keep_sorted():
self._execute(SortCommand, [])
if self.backup:
self.backup.save(self.todolist)
self.todofile.write(self.todolist.print_todos())
self.backup = None
def run(self):
raise NotImplementedError
# Topydo - A todo.txt client written in Python.
# Copyright (C) 2014 - 2015 Bram Schoenmakers <me@bramschoenmakers.nl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from topydo.lib.Command import Command
from topydo.lib.ChangeSet import ChangeSet
from topydo.lib import TodoFile
from topydo.lib import TodoList
from topydo.lib.Config import config
class RevertCommand(Command):
def __init__(self, p_args, p_todolist,
p_out=lambda a: None,
p_err=lambda a: None,
p_prompt=lambda a: None):
super(RevertCommand, self).__init__(
p_args, p_todolist, p_out, p_err, p_prompt=lambda a: None)
def execute(self):
if not super(RevertCommand, self).execute():
return False
archive_file = TodoFile.TodoFile(config().archive())
archive = TodoList.TodoList(archive_file.read())
last_change = ChangeSet()
try:
last_change.get_backup(self.todolist)
last_change.apply(self.todolist, archive)
archive_file.write(archive.print_todos())
last_change.delete()
self.out("Successfully reverted: " + last_change.call)
except (ValueError, KeyError):
self.error('No backup was found for the current state of ' + config().todotxt())
last_change.close()
def usage(self):
return """Synopsis: revert"""
def help(self):
return """\
Reverts the last command.
"""
# Topydo - A todo.txt client written in Python.
# Copyright (C) 2014 - 2015 Bram Schoenmakers <me@bramschoenmakers.nl>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
""" This module serves for managing todo and archive changesets. """
import json
import time
import zlib
from copy import deepcopy
from hashlib import sha1
from os import path
from topydo.lib.Config import config
from topydo.lib.TodoList import TodoList
def hash_todolist(p_todolist):
""" Calculates hash for TodoList.TodoList object. """
todolist_hash = sha1(p_todolist.print_todos().encode('utf-8')).hexdigest()
return todolist_hash
def get_backup_path():
""" Returns full path and filename of backup file """
dirname, filename = path.split(path.splitext(config().todotxt())[0])
filename = '.' + filename + '.bak'
return path.join(dirname, filename)
class ChangeSet(object):
""" Class for operations related with backup management. """
def __init__(self, p_todolist=None, p_archive=None, p_call=[]):
self.todolist = deepcopy(p_todolist)
self.archive = deepcopy(p_archive)
self.timestamp = str(int(time.time()))
self.call = ' '.join(p_call)
try:
self.json_file = open(get_backup_path(), 'r+b')
except IOError:
self.json_file = open(get_backup_path(), 'w+b')
self._read()
def _read(self):
"""
Reads backup file from json_file property and sets backup_dict property
with data decompressed and deserialized from that file. If no usable
data is found backup_dict is set to the empty dict.
"""
self.json_file.seek(0)
try:
data = zlib.decompress(self.json_file.read())
self.backup_dict = json.loads(data.decode('utf-8'))
except (EOFError, zlib.error):
self.backup_dict = {}
def _write(self):
"""
Writes data from backup_dict property in serialized and compressed form
to backup file pointed in json_file property.
"""
self.json_file.seek(0)
self.json_file.truncate()
dump = json.dumps(self.backup_dict)
dump_c = zlib.compress(dump.encode('utf-8'))
self.json_file.write(dump_c)
def add_archive(self, p_archive):
""" Sets deep copy of p_archive as archive attribute. """
self.archive = deepcopy(p_archive)
def add_todolist(self, p_todolist):
""" Sets deep copy of p_todolist as todolist attribute. """
self.todolist = deepcopy(p_todolist)
def save(self, p_todolist):
"""
Saves a tuple with archive, todolist and command with its arguments
into the backup file with unix timestamp as the key. Tuple is then
indexed in backup file with combination of hash calculated from
p_todolist and unix timestamp. Backup file is closed afterwards.
"""
self._trim()
current_hash = hash_todolist(p_todolist)
list_todo = (self.todolist.print_todos()+'\n').splitlines(True)
list_archive = (self.archive.print_todos()+'\n').splitlines(True)
self.backup_dict[self.timestamp] = (list_todo, list_archive, self.call)
index = self._get_index()
index.insert(0, (self.timestamp, current_hash))
self._save_index(index)
self._write()
self.close()
def delete(self, p_timestamp=None):
""" Removes backup from the backup file. """
timestamp = p_timestamp or self.timestamp
index = self._get_index()
try:
del self.backup_dict[timestamp]
index.remove(index[[change[0] for change in index].index(timestamp)])
self._save_index(index)
self._write()
except KeyError:
pass
def _get_index(self):
try:
index = self.backup_dict['index']
except KeyError:
self.backup_dict['index'] = []
index = self.backup_dict['index']
return index
def _save_index(self, p_index):
"""
Saves index of backups supplied in p_index into the backup_file
property with 'index' as the key.
"""
self.backup_dict['index'] = p_index
def _trim(self):
"""
Removes oldest backups that exceed the limit configured in backup_count
option.
"""
index = self._get_index()
backup_limit = config().backup_count() - 1
for changeset in index[backup_limit:]:
self.delete(changeset[0])
def get_backup(self, p_todolist):
"""
Retrieves a backup for p_todolist from backup file and sets todolist,
archive and call attributes to appropriate data from it.
"""
change_hash = hash_todolist(p_todolist)
index = self._get_index()
self.timestamp = index[[change[1] for change in index].index(change_hash)][0]
d = self.backup_dict[self.timestamp]
self.todolist = TodoList(d[0])
self.archive = TodoList(d[1])
self.call = d[2]
def apply(self, p_todolist, p_archive):
""" Applies backup on supplied p_todolist. """
if self.todolist:
p_todolist.replace(self.todolist.todos())
if self.archive:
p_archive.replace(self.archive.todos())
def close(self):
""" Closes backup file. """
self.json_file.close()
......@@ -57,6 +57,7 @@ class _Config:
'filename': 'todo.txt',
'archive_filename': 'done.txt',
'identifiers': 'linenumber',
'backup_count': '5',
# add
'auto_creation_date': '1',
......@@ -139,6 +140,15 @@ class _Config:
def identifiers(self):
return self.cp.get('topydo', 'identifiers')
def backup_count(self):
try:
value = self.cp.getint('topydo', 'backup_count')
if value < 0:
value = 0
return value
except ValueError:
return int(self.defaults['backup_count'])
def list_limit(self):
try:
return self.cp.getint('ls', 'list_limit')
......@@ -256,7 +266,6 @@ class _Config:
except ValueError:
return self.defaults['auto_creation_date'] == '1'
def config(p_path=None, p_overrides=None):
"""
Retrieve the config instance.
......
......@@ -173,6 +173,12 @@ class TodoListBase(object):
self._todos = []
self.dirty = True
def replace(self, p_todos):
""" Replaces whole todolist with todo objects supplied as p_todos. """
self.erase()
self.add_todos(p_todos)
self.dirty = True
def count(self):
""" Returns the number of todos on this list. """
return len(self._todos)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment