Commit 83407df8 authored by Lisa Casino's avatar Lisa Casino

slapos/collect: add two tests in test_collect.py

- test_process_in_progress_disk_usage: check that the function update_folder_size() doesn't start if the process "du" is still in progress.
- test_time_cycle: check that the parameter time_cycle is respected.
parent 2432ef2c
......@@ -26,6 +26,7 @@
##############################################################################
from slapos.util import mkdir_p
from datetime import datetime, timedelta
import csv
import six
import mock
......@@ -40,6 +41,7 @@ import tempfile
import slapos.slap
import psutil
import sqlite3
import subprocess
from time import strftime
from slapos.collect import entity, snapshot, db, reporter
from slapos.cli.entry import SlapOSApp
......@@ -495,6 +497,11 @@ class TestCollectSnapshot(unittest.TestCase):
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def getFakeUser(self, disk_snapshot_params={}):
os.mkdir("%s/fakeuser0" % self.instance_root)
return entity.User("fakeuser0",
"%s/fakeuser0" % self.instance_root, disk_snapshot_params )
def test_process_snapshot(self):
process = psutil.Process(os.getpid())
process_snapshot = snapshot.ProcessSnapshot(process)
......@@ -534,7 +541,62 @@ class TestCollectSnapshot(unittest.TestCase):
use_quota=True)
disk_snapshot.update_folder_size()
self.assertNotEqual(disk_snapshot.disk_usage, 0)
def test_process_in_progress_disk_usage(self):
pid_file = os.path.join(self.instance_root, 'sleep.pid')
disk_snapshot = snapshot.FolderSizeSnapshot(self.instance_root, pid_file)
command = 'sleep 1h'
process = subprocess.Popen(command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True)
with open(pid_file, 'w') as fpid:
pid = fpid.write(str(process.pid))
self.assertTrue(os.path.isfile(pid_file))
self.assertEqual(disk_snapshot.update_folder_size(), None)
disk_snapshot = snapshot.FolderSizeSnapshot(self.instance_root, pid_file,
use_quota=True)
self.assertEqual(disk_snapshot.update_folder_size(), None)
process.terminate()
def test_time_cycle(self):
disk_snapshot_params = {'enable': True, 'time_cycle': 3600, 'testing': True}
user = self.getFakeUser(disk_snapshot_params)
database = db.Database(self.instance_root, create=True)
date = datetime.utcnow().date()
time = datetime.utcnow().time().strftime("%H:%M:%S")
time_earlier = (datetime.utcnow() - \
timedelta(hours=3)).time().strftime("%H:%M:%S")
database.connect()
database.inserFolderSnapshot('fakeuser0', '1.0', date, time_earlier)
database.commit()
database.close()
# check that _insertDiskSnapShot called update_folder_size
with mock.patch('slapos.collect.snapshot.FolderSizeSnapshot.update_folder_size'
) as update_folder_size_call:
user._insertDiskSnapShot(database, date, time)
update_folder_size_call.assert_called_once()
time_earlier = (datetime.utcnow() - \
timedelta(minutes=10)).time().strftime("%H:%M:%S")
database.connect()
database.inserFolderSnapshot('fakeuser0', '1.0', date, time_earlier)
database.commit()
database.close()
# check that _insertDiskSnapShot stop before calling update_folder_size
with mock.patch('slapos.collect.snapshot.FolderSizeSnapshot.update_folder_size'
) as update_folder_size_call:
user._insertDiskSnapShot(database, date, time)
update_folder_size_call.assert_not_called()
def test_process_snapshot_broken_process(self):
self.assertRaises(AssertionError,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment