Commit 0caca582 authored by Jérome Perrin's avatar Jérome Perrin

utils: introduce helper classes to manage software and instance schemas

parent 6ad34327
......@@ -74,6 +74,7 @@ setup(name=name,
'six',
'cachecontrol',
'lockfile',
'jsonschema',
'uritemplate', # used by hateoas navigator
'subprocess32; python_version<"3"',
'ipaddress; python_version<"3"', # used by whitelistfirewall
......
......@@ -27,19 +27,32 @@
#
##############################################################################
import enum
import errno
import hashlib
import json
import os
import shutil
import socket
import sqlite3
import struct
import subprocess
import sqlite3
from xml_marshaller.xml_marshaller import Marshaller, Unmarshaller
from lxml import etree
import warnings
import jsonschema
import netaddr
import requests
import six
from lxml import etree
from six.moves.urllib import parse
import hashlib
import netaddr
import shutil
from six.moves.urllib_parse import urljoin
from xml_marshaller.xml_marshaller import Marshaller, Unmarshaller
try:
from typing import Dict, Optional, IO
except ImportError:
pass
try:
......@@ -295,3 +308,143 @@ def rmtree(path):
raise e # XXX make pylint happy
shutil.rmtree(path, onerror=chmod_retry)
def _readAsJson(url):
# type: (str) -> Optional[Dict]
"""Reads and parse the json file located at `url`.
`url` can also be the path of a local file.
"""
if url.startswith('file://'):
url = url[len('file://'):]
path = url if os.path.exists(url) else None
if path:
with open(path) as f:
try:
return json.load(f)
except ValueError:
return None
if url.startswith('http://') or url.startswith('https://'):
try:
r = requests.get(url)
r.raise_for_status()
return r.json()
except (requests.exceptions.HTTPError, ValueError):
return None
return None
class SoftwareReleaseSerialisation(str, enum.Enum):
Xml = 'xml'
JsonInXml = 'json-in-xml'
class SoftwareReleaseSchema(object):
def __init__(self, software_url, software_type):
# type: (str, Optional[str]) -> None
self.software_url = software_url
self.software_type = software_type
def getSoftwareSchema(self):
# type: () -> Optional[Dict]
"""Returns the schema for this software.
"""
return _readAsJson(self.software_url + '.json')
def getSoftwareTypeSchema(self):
# type: () -> Optional[Dict]
"""Returns schema for this software type.
"""
software_schema = self.getSoftwareSchema()
if software_schema is None:
return None
software_type = self.software_type
from slapos.slap.slap import DEFAULT_SOFTWARE_TYPE # TODO
if software_type is None:
software_type = DEFAULT_SOFTWARE_TYPE
# XXX some software are using "default" for default software type
if software_type == DEFAULT_SOFTWARE_TYPE \
and software_type not in software_schema['software-type'] \
and 'default' in software_schema['software-type']:
warnings.warn(
"Software release {} does not have schema for DEFAULT_SOFTWARE_TYPE but has one for 'default'."
" Using 'default' instead.".format(self.software_url),
UserWarning,
)
software_type = 'default'
return software_schema['software-type'].get(software_type)
def getSerialisation(self):
# type: () -> Optional[SoftwareReleaseSerialisation]
"""Returns the serialisation method used for parameters.
"""
software_schema = self.getSoftwareSchema()
if software_schema is None:
return None
return SoftwareReleaseSerialisation(software_schema['serialisation'])
def getInstanceRequestParameterSchemaURL(self):
# type: () -> Optional[str]
"""Returns the URL of the schema defining instance parameters.
"""
software_type_schema = self.getSoftwareTypeSchema()
if software_type_schema is None:
return None
software_url = self.software_url
if os.path.exists(software_url):
software_url = 'file://' + software_url
return urljoin(software_url, software_type_schema['request'])
def getInstanceRequestParameterSchema(self):
# type: () -> Optional[Dict]
"""Returns the schema defining instance parameters.
"""
instance_parameter_schema_url = self.getInstanceRequestParameterSchemaURL()
if instance_parameter_schema_url is None:
return None
schema = _readAsJson(instance_parameter_schema_url)
if schema:
# so that jsonschema knows how to resolve references
schema.setdefault('$id', instance_parameter_schema_url)
return schema
def getInstanceConnectionParameterSchemaURL(self):
# type: () -> Optional[str]
"""Returns the URL of the schema defining connection parameters published by the instance.
"""
software_type_schema = self.getSoftwareTypeSchema()
if software_type_schema is None:
return None
return urljoin(self.software_url, software_type_schema['response'])
def getInstanceConnectionParameterSchema(self):
# type: () -> Optional[Dict]
"""Returns the schema defining connection parameters published by the instance.
"""
instance_parameter_schema_url = self.getInstanceConnectionParameterSchemaURL()
if instance_parameter_schema_url is None:
return None
schema = _readAsJson(instance_parameter_schema_url)
if schema:
# so that jsonschema knows how to resolve references
schema.setdefault('$id', instance_parameter_schema_url)
return schema
def validateInstanceParameterDict(self, parameter_dict):
# type: (Dict) -> None
"""Validate instance parameters against the software schema.
Raise jsonschema.ValidationError if parameters does not validate.
"""
schema_url = self.getInstanceRequestParameterSchemaURL()
if schema_url:
instance = parameter_dict if self.getSerialisation() == SoftwareReleaseSerialisation.Xml else json.loads(parameter_dict['_'])
instance.pop('$schema', None)
jsonschema.validate(
instance=instance,
schema=self.getInstanceRequestParameterSchema(),
)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment