Commit b771388a authored by Rafael Monnerat's avatar Rafael Monnerat

slapos.package: Include test for AptGet and Zypper Handlers

parent eb1cd142
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
from slapos.package.distribution import PackageManager, AptGet, Zypper, \ from slapos.package.distribution import PackageManager, AptGet, Zypper, \
UnsupportedOSException UnsupportedOSException
import os import os
import glob
import unittest import unittest
def _fake_call(self, *args, **kw): def _fake_call(self, *args, **kw):
...@@ -71,6 +72,10 @@ class testPackageManager(unittest.TestCase): ...@@ -71,6 +72,10 @@ class testPackageManager(unittest.TestCase):
def setUp(self): def setUp(self):
PackageManager._call = _fake_call PackageManager._call = _fake_call
AptGet.source_list_path = "/tmp/test_distribution_sources.list"
AptGet.source_list_d_path = "/tmp/test_distribution_sources.list.d"
AptGet.trusted_gpg_d_path = "/tmp/test_distribution_trusted.gpg.d"
def testGetDistributionHandler(self): def testGetDistributionHandler(self):
package_manager = PackageManager() package_manager = PackageManager()
...@@ -169,3 +174,320 @@ class testPackageManager(unittest.TestCase): ...@@ -169,3 +174,320 @@ class testPackageManager(unittest.TestCase):
package_manager._updateSystem() package_manager._updateSystem()
self.assertEquals(handler.called, ["updateSystem"]) self.assertEquals(handler.called, ["updateSystem"])
def testAptGetPurgeRepository(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append("called")
source_list = "/tmp/test_distribution_sources.list"
source_list_d = "/tmp/test_distribution_sources.list.d"
if not os.path.exists(source_list_d):
os.mkdir(source_list_d)
open("%s/couscous.list" % source_list_d, "w+").write("# test")
handler.purgeRepository(dummy_caller)
self.assertTrue(os.path.exists(source_list))
self.assertEquals(open(source_list, "r").read(),
"# Removed all")
self.assertEquals(glob.glob("%s/*" % source_list_d), [])
def testAptGetAddRepository(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append("called")
source_list_d = "/tmp/test_distribution_sources.list.d"
if os.path.exists(source_list_d):
os.rmdir(source_list_d)
handler.addRepository(dummy_caller, "http://test main", "slapos")
self.assertTrue(os.path.exists(source_list_d))
self.assertEquals(open("%s/slapos.list" % source_list_d, "r").read(),
"deb http://test main")
# def addKey(self, caller, url, alias):
# """ Download and add a gpg key """
# gpg_path = open("%s/%s.gpg" % self.trusted_gpg_d_path, alias)
# if os.path.exists(gpg_path):
# # File already exists, skip
# return
# raise NotImplementedError("Download part is missing")
def testAptGetUpdateRepository(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateRepository(dummy_caller)
self.assertEquals(len(called), 1)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['apt-get', 'update'], command)
self.assertEquals(extra, {'stdout': None})
def testAptGetInstallSoftwareList(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.installSoftwareList(dummy_caller, ["slapos", "re6st"])
self.assertEquals(len(called), 2)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['apt-get', 'update'], command)
self.assertEquals(extra, {'stdout': None})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(["apt-get", "install", "-y", "slapos", "re6st"], command)
self.assertEquals(extra, {'stdout': None})
# def isUpgradable(self, caller, name):
# output, err = caller(["apt-get", "upgrade", "--dry-run"])
# for line in output.splitlines():
# if line.startswith("Inst %s" % name):
# return True
# return False
#
def testAptGetUpdateSoftware(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateSoftware(dummy_caller)
self.assertEquals(len(called), 2)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['apt-get', 'update'], command)
self.assertEquals(extra, {'stdout': None})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(["apt-get", "upgrade"], command)
self.assertEquals(extra, {'stdout': None})
def testAptGetUpdateSystem(self):
handler = AptGet()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateSystem(dummy_caller)
self.assertEquals(len(called), 2)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['apt-get', 'update'], command)
self.assertEquals(extra, {'stdout': None})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(["apt-get", "dist-upgrade", "-y"], command)
self.assertEquals(extra, {'stdout': None})
def testZypperPurgeRepository_NoRepository(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
return "", ""
handler.purgeRepository(dummy_caller)
self.assertEquals(len(called), 1)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', 'lr'],
command)
self.assertEquals(extra, {'stderr': -1, 'stdout': -1})
def testZypperPurgeRepository_3Repository(self):
handler = Zypper()
called = []
case = []
repository_list_string = [
"""# | Alias | Name | Enabled | Refresh
--+--------+--------+---------+--------
1 | re6st | re6st | Yes | Yes
2 | slapos | slapos | Yes | Yes
3 | suse | suse | Yes | Yes
""",
"""# | Alias | Name | Enabled | Refresh
--+--------+--------+---------+--------
1 | slapos | slapos | Yes | Yes
2 | suse | suse | Yes | Yes
""",
"""# | Alias | Name | Enabled | Refresh
--+--------+--------+---------+--------
1 | suse | suse | Yes | Yes
""",
"""# | Alias | Name | Enabled | Refresh
--+--------+--------+---------+--------
"""]
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
if args[0] == ['zypper', 'rr', '1']:
case.append(1)
return repository_list_string[len(case)], ""
handler.purgeRepository(dummy_caller)
self.assertEquals(len(called), 7)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', 'lr'],
command)
self.assertEquals(extra, {'stderr': -1, 'stdout': -1})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(['zypper', 'rr', '1'],
command)
self.assertEquals(extra, {'stdout': None})
command = called[2][0][0]
extra = called[2][1]
self.assertEquals(['zypper', 'lr'],
command)
self.assertEquals(extra, {'stderr': -1, 'stdout': -1})
command = called[3][0][0]
extra = called[3][1]
self.assertEquals(['zypper', 'rr', '1'],
command)
self.assertEquals(extra, {'stdout': None})
command = called[4][0][0]
extra = called[4][1]
self.assertEquals(['zypper', 'lr'],
command)
self.assertEquals(extra, {'stderr': -1, 'stdout': -1})
command = called[5][0][0]
extra = called[5][1]
self.assertEquals(['zypper', 'rr', '1'],
command)
self.assertEquals(extra, {'stdout': None})
command = called[6][0][0]
extra = called[6][1]
self.assertEquals(['zypper', 'lr'],
command)
self.assertEquals(extra, {'stderr': -1, 'stdout': -1})
def purgeRepository(self, caller):
listing, err = caller(['zypper', 'lr'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while listing.count('\n') > 2:
output, err = caller(['zypper', 'rr', '1'], stdout=None)
listing, err = caller(['zypper', 'lr'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
def testZypperAddRepository(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.addRepository(dummy_caller, "http://...", "slapos")
handler.addRepository(dummy_caller, "http://.../2", "re6st-unsafe")
self.assertEquals(len(called), 2)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', 'ar', '-fc', 'http://...', 'slapos'],
command)
self.assertEquals(extra, {'stdout': None})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(['zypper', 'ar', '-fc', '--no-gpgcheck' ,
'http://.../2', 're6st-unsafe'], command)
self.assertEquals(extra, {'stdout': None})
# def addKey(self, caller, url, alias):
# """ Add gpg or key """
# raise NotImplementedError("Not implemented for this distribution")
#
def testZypperUpdateRepository(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateRepository(dummy_caller)
self.assertEquals(len(called), 1)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', '--gpg-auto-import-keys', 'in', '-Dly'],
command)
self.assertEquals(extra, {'stdout': None})
# def isUpgradable(self, caller, name):
# output, err = caller(['zypper', '--gpg-auto-import-keys', 'up', '-ly'],
# stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# for line in output.splitlines():
# if line.startswith("'%s' is already installed." % name):
# return False
# return True
#
def testZypperInstallSoftwareList(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.installSoftwareList(dummy_caller, ["slapos", "re6st"])
self.assertEquals(len(called), 2)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', '--gpg-auto-import-keys', 'in', '-Dly'],
command)
self.assertEquals(extra, {'stdout': None})
command = called[1][0][0]
extra = called[1][1]
self.assertEquals(command,
['zypper', '--gpg-auto-import-keys', 'in', '-ly', 'slapos', 're6st'])
self.assertEquals(extra, {'stdout': None})
def testZypperUpdateSoftware(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateSoftware(dummy_caller)
self.assertEquals(len(called), 1)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', '--gpg-auto-import-keys', 'up', '-ly'],
command)
self.assertEquals(extra, {'stdout': None})
def testZypperUpdateSystem(self):
handler = Zypper()
called = []
def dummy_caller(*args, **kwargs):
called.append((args, kwargs))
handler.updateSystem(dummy_caller)
self.assertEquals(len(called), 1)
command = called[0][0][0]
extra = called[0][1]
self.assertEquals(['zypper', '--gpg-auto-import-keys', 'dup', '-ly'],
command)
self.assertEquals(extra, {'stdout': None})
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment