Commit 8fd73f9a authored by Jérome Perrin's avatar Jérome Perrin

standalone,testcase: integrate nxdbom

This needs nxdbom to be installed and to be in PATH. When enabled, this
will run nxdbom after building software to produce SBOM both in text and
cyclonedx formats.
parent 19903d3e
# coding: utf-8
import errno
import logging
import os
import shlex
import subprocess
from zope.interface import implementer
from slapos.manager import interface
logger = logging.getLogger(__name__)
@implementer(interface.IManager)
class NxdBOMManager(object):
"""Runs nxd-bom after running software installation.
"""
def __init__(self, config):
self.config = config
def software(self, software):
pass
def softwareTearDown(self, software):
try:
installation_time = os.stat(
os.path.join(software.software_path, '.completed')).st_mtime
except OSError as e:
if e.errno == errno.ENOENT:
return
raise
for f, o in (
('text', 'nxdbom.txt'),
('cyclonedx-json', 'nxdbom.cdx.json'),
):
output_file = os.path.join(software.software_path, o)
if os.path.exists(output_file) \
and os.stat(output_file).st_mtime >= installation_time:
logger.debug('%s already up to date', output_file)
continue
args = [
'nxdbom',
'--format',
f,
'--output',
output_file,
'software',
software.software_path,
]
cmd = ' '.join([shlex.quote(a) for a in args])
logger.info('Running: %s', cmd)
subprocess.check_call(args)
def format(self, computer):
pass
def formatTearDown(self, computer):
pass
def instance(self, partition):
pass
def instanceTearDown(self, instance):
pass
def report(self, partition):
pass
Manager = NxdBOMManager
\ No newline at end of file
...@@ -212,13 +212,24 @@ class SlapOSConfigWriter(ConfigWriter): ...@@ -212,13 +212,24 @@ class SlapOSConfigWriter(ConfigWriter):
config += "partition = {pfc.partition}\n".format(pfc=pfc) config += "partition = {pfc.partition}\n".format(pfc=pfc)
yield config yield config
def _getManagerConfiguration(self):
# type: () -> Iterable[str]
for manager, manager_config in self._standalone_slapos._manager_dict.items():
yield '[manager-{manager}]'.format(manager=manager)
for k, v in (manager_config or {}).items():
yield '{k} = {v}'.format(k=k, v=v)
def writeConfig(self, path): def writeConfig(self, path):
# type: (str) -> None # type: (str) -> None
# TODO: use configparser instead of building with text
standalone_slapos = self._standalone_slapos standalone_slapos = self._standalone_slapos
read_only_shared_part_list = '\n '.join( # pylint: disable=unused-variable; used in format() read_only_shared_part_list = '\n '.join( # pylint: disable=unused-variable; used in format()
standalone_slapos._shared_part_list) standalone_slapos._shared_part_list)
partition_forward_configuration = '\n'.join(self._getPartitionForwardConfiguration()) partition_forward_configuration = '\n'.join(self._getPartitionForwardConfiguration())
has_ipv6_range = ('false', 'true')[standalone_slapos._partitions_have_ipv6_range] has_ipv6_range = ('false', 'true')[standalone_slapos._partitions_have_ipv6_range]
manager_list = '\n '.join(standalone_slapos._manager_dict.keys())
manager_configuration = '\n'.join(self._getManagerConfiguration())
with open(path, 'w') as f: with open(path, 'w') as f:
f.write( f.write(
textwrap.dedent( textwrap.dedent(
...@@ -237,6 +248,8 @@ class SlapOSConfigWriter(ConfigWriter): ...@@ -237,6 +248,8 @@ class SlapOSConfigWriter(ConfigWriter):
pidfile_instance = {standalone_slapos._software_pid} pidfile_instance = {standalone_slapos._software_pid}
pidfile_report = {standalone_slapos._report_pid} pidfile_report = {standalone_slapos._report_pid}
forbid_supervisord_automatic_launch = true forbid_supervisord_automatic_launch = true
manager_list =
{manager_list}
[slapformat] [slapformat]
input_definition_file = {standalone_slapos._slapformat_definition} input_definition_file = {standalone_slapos._slapformat_definition}
...@@ -255,6 +268,8 @@ class SlapOSConfigWriter(ConfigWriter): ...@@ -255,6 +268,8 @@ class SlapOSConfigWriter(ConfigWriter):
local_software_release_root = {standalone_slapos._local_software_release_root} local_software_release_root = {standalone_slapos._local_software_release_root}
{partition_forward_configuration} {partition_forward_configuration}
{manager_configuration}
""").format(**locals())) """).format(**locals()))
...@@ -402,10 +417,11 @@ class StandaloneSlapOS(object): ...@@ -402,10 +417,11 @@ class StandaloneSlapOS(object):
instance_root=None, instance_root=None,
shared_part_root=None, shared_part_root=None,
partition_forward_configuration=(), partition_forward_configuration=(),
manager_dict=None,
slapos_bin='slapos', slapos_bin='slapos',
local_software_release_root=os.sep, local_software_release_root=os.sep,
): ):
# type: (str, str, int, str, Iterable[str], Optional[str], Optional[str], Optional[str], Iterable[Union[PartitionForwardConfiguration, PartitionForwardAsPartitionConfiguration]], str, str) -> None # type: (str, str, int, str, Iterable[str], Optional[str], Optional[str], Optional[str], Iterable[Union[PartitionForwardConfiguration, PartitionForwardAsPartitionConfiguration]], Dict[str, Optional[Dict[str, str]]], str, str) -> None
"""Constructor, creates a standalone slapos in `base_directory`. """Constructor, creates a standalone slapos in `base_directory`.
Arguments: Arguments:
...@@ -417,6 +433,7 @@ class StandaloneSlapOS(object): ...@@ -417,6 +433,7 @@ class StandaloneSlapOS(object):
* `instance_root` -- directory to create instances, default to "inst" in `base_directory` * `instance_root` -- directory to create instances, default to "inst" in `base_directory`
* `shared_part_root` -- directory to hold shared parts software, default to "shared" in `base_directory`. * `shared_part_root` -- directory to hold shared parts software, default to "shared" in `base_directory`.
* `partition_forward_configuration` -- configuration of partition request forwarding to external SlapOS master. * `partition_forward_configuration` -- configuration of partition request forwarding to external SlapOS master.
* `manager_dict` -- managers to enable, optionally with their configuration.
* `slapos_bin` -- slapos executable to use, default to "slapos" (thus depending on the runtime PATH). * `slapos_bin` -- slapos executable to use, default to "slapos" (thus depending on the runtime PATH).
* `local_software_release_root` -- root for local Software Releases paths in the SlapOS proxy, default to `/`. * `local_software_release_root` -- root for local Software Releases paths in the SlapOS proxy, default to `/`.
...@@ -436,6 +453,7 @@ class StandaloneSlapOS(object): ...@@ -436,6 +453,7 @@ class StandaloneSlapOS(object):
self._base_directory = base_directory self._base_directory = base_directory
self._shared_part_list = list(shared_part_list) self._shared_part_list = list(shared_part_list)
self._partition_forward_configuration = list(partition_forward_configuration) self._partition_forward_configuration = list(partition_forward_configuration)
self._manager_dict = manager_dict or {}
self._partition_count = -1 self._partition_count = -1
self._partition_base_name = 'slappart' self._partition_base_name = 'slappart'
self._ipv4_address = None self._ipv4_address = None
......
...@@ -153,7 +153,9 @@ def makeModuleSetUpAndTestCaseClass( ...@@ -153,7 +153,9 @@ def makeModuleSetUpAndTestCaseClass(
base_directory=base_directory, base_directory=base_directory,
server_ip=ipv4_address, server_ip=ipv4_address,
server_port=getPortFromPath(base_directory), server_port=getPortFromPath(base_directory),
shared_part_list=shared_part_list) shared_part_list=shared_part_list,
manager_dict={'nxdbom': None},
)
except PathTooDeepError: except PathTooDeepError:
raise RuntimeError( raise RuntimeError(
'base directory ( {} ) is too deep, try setting ' 'base directory ( {} ) is too deep, try setting '
...@@ -209,6 +211,10 @@ def installSoftwareUrlList(cls, software_url_list, max_retry=10, debug=False): ...@@ -209,6 +211,10 @@ def installSoftwareUrlList(cls, software_url_list, max_retry=10, debug=False):
cls.slap.software_directory, cls.slap.software_directory,
'*', '*',
'.installed.cfg', '.installed.cfg',
)) + glob.glob(os.path.join(
cls.slap.software_directory,
'*',
'nxdbom*',
)) + glob.glob(os.path.join( )) + glob.glob(os.path.join(
cls.slap.shared_directory, cls.slap.shared_directory,
'*', '*',
......
...@@ -382,6 +382,14 @@ class SlapOSStandaloneTestCase(unittest.TestCase): ...@@ -382,6 +382,14 @@ class SlapOSStandaloneTestCase(unittest.TestCase):
# can set this class attribute to False to prevent this behavior. # can set this class attribute to False to prevent this behavior.
_auto_stop_standalone = True _auto_stop_standalone = True
def _getStandaloneSlapOSInitKw(self):
return {
'shared_part_list': [
os.path.expanduser(p) for p in os.environ.get(
'SLAPOS_TEST_SHARED_PART_LIST', '').split(os.pathsep) if p
],
}
def setUp(self): def setUp(self):
checkPortIsFree() checkPortIsFree()
working_dir = tempfile.mkdtemp(prefix=__name__) working_dir = tempfile.mkdtemp(prefix=__name__)
...@@ -390,10 +398,7 @@ class SlapOSStandaloneTestCase(unittest.TestCase): ...@@ -390,10 +398,7 @@ class SlapOSStandaloneTestCase(unittest.TestCase):
working_dir, working_dir,
SLAPOS_TEST_IPV4, SLAPOS_TEST_IPV4,
SLAPOS_TEST_PORT, SLAPOS_TEST_PORT,
shared_part_list=[ **self._getStandaloneSlapOSInitKw()
os.path.expanduser(p) for p in os.environ.get(
'SLAPOS_TEST_SHARED_PART_LIST', '').split(os.pathsep) if p
],
) )
self.addCleanup(self.stopStandalone) self.addCleanup(self.stopStandalone)
self.standalone.format(1, SLAPOS_TEST_IPV4, SLAPOS_TEST_IPV6) self.standalone.format(1, SLAPOS_TEST_IPV4, SLAPOS_TEST_IPV6)
...@@ -532,6 +537,59 @@ class TestSlapOSStandaloneSoftware(SlapOSStandaloneTestCase): ...@@ -532,6 +537,59 @@ class TestSlapOSStandaloneSoftware(SlapOSStandaloneTestCase):
self.assertIn("Red Green Blue", e.exception.args[0]['output']) self.assertIn("Red Green Blue", e.exception.args[0]['output'])
class TestSlapOSStandaloneSoftwareManager(SlapOSStandaloneTestCase):
def _getStandaloneSlapOSInitKw(self):
return dict(super(
TestSlapOSStandaloneSoftwareManager, self)._getStandaloneSlapOSInitKw(),
manager_dict={
'nxdbom': None,
}
)
def test_install_software_with_nxdbom_manager(self):
with tempfile.NamedTemporaryFile(suffix="-%s.cfg" % self.id()) as f:
f.write(
textwrap.dedent(
'''
[buildout]
parts = instance plone.recipe.command
newest = false
[plone.recipe.command]
recipe = zc.recipe.egg
[instance]
recipe = plone.recipe.command
command = touch ${buildout:directory}/instance.cfg
[versions]
plone.recipe.command = 1.1
''').encode())
f.flush()
self.standalone.supply(f.name)
self.standalone.waitForSoftware()
software_hash = hashlib.md5(f.name.encode()).hexdigest()
software_installation_path = os.path.join(
self.standalone.software_directory, software_hash)
# this produced reports
nxdbom_txt = os.path.join(software_installation_path, 'nxdbom.txt')
with open(nxdbom_txt) as f:
self.assertIn('https://pypi.org/project/plone.recipe.command/1.1/', f.read())
with open(os.path.join(software_installation_path, 'nxdbom.cdx.json')) as f:
cdx = json.load(f)
self.assertIn(
'pkg:pypi/plone.recipe.command@1.1',
[c['purl'] for c in cdx['components']])
nxdbom_txt_mtime = os.stat(nxdbom_txt).st_mtime
# reports are only produced when software is installed
self.standalone.waitForSoftware()
self.assertEqual(os.stat(nxdbom_txt).st_mtime, nxdbom_txt_mtime)
os.utime(os.path.join(software_installation_path, '.completed'))
self.standalone.waitForSoftware()
self.assertGreater(os.stat(nxdbom_txt).st_mtime, nxdbom_txt_mtime)
class TestSlapOSStandaloneInstance(SlapOSStandaloneTestCase): class TestSlapOSStandaloneInstance(SlapOSStandaloneTestCase):
def test_request_instance(self): def test_request_instance(self):
with tempfile.NamedTemporaryFile(suffix="-%s.cfg" % self.id()) as f: with tempfile.NamedTemporaryFile(suffix="-%s.cfg" % self.id()) as f:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment