Commit f62e3fc4 authored by Jérome Perrin's avatar Jérome Perrin

Libreoffice 7.5.2.2 / Python 3

See merge request !32
parents 27d76c56 98055878
Pipeline #28699 failed with stage
in 0 seconds
.. warning:: this documentation is outdated. See https://lab.nexedi.com/nexedi/cloudooo , https://lab.nexedi.com/nexedi/slapos/tree/master/software/cloudooo or https://cloudooo.nexedi.com/ for more up to date information.
Install Cloudooo
================
::
$ python2.6 setup.py install
Warnings:
- you must have installed setuptools>=0.6c11 in this python.
$ python setup.py install
Install LibreOffice / OpenOffice.org
====================================
......
......@@ -2,11 +2,11 @@
import unittest
import sys
from base64 import encodestring
from xmlrpclib import ServerProxy
from base64 import encodebytes
from xmlrpc.client import ServerProxy
from getopt import getopt, GetoptError
DOCUMENT_STRING = """MemoryMonitor - TimeoutMonitor -
DOCUMENT_STRING = b"""MemoryMonitor - TimeoutMonitor -
RequestMonitor\n\nOOHandler\n\nMimemapper\n\nERP5\n"""
HOSTNAME = PORT = None
......@@ -15,17 +15,17 @@ class CloudoooTestCase(unittest.TestCase):
""" """
def setUp(self):
self.proxy_address = "http://%s:%s" % (HOSTNAME, PORT)
self.proxy_address = f"http://{HOSTNAME}:{PORT}"
def test_run_generate(self):
data = encodestring(DOCUMENT_STRING)
data = encodebytes(DOCUMENT_STRING)
proxy = ServerProxy(self.proxy_address, allow_none=True)
res = proxy.run_generate("t.text", data, None, 'pdf', 'text/plain')
self.assertEqual(res[1]['mime'], "application/pdf")
self.assertEqual(res[0], 200)
def test_set_metadata(self):
data = encodestring(DOCUMENT_STRING)
data = encodebytes(DOCUMENT_STRING)
proxy = ServerProxy(self.proxy_address, allow_none=True)
odt_data = proxy.convertFile(data, 'txt', 'odt')
metadata_dict = proxy.getFileMetadataItemList(odt_data, 'odt')
......@@ -33,7 +33,7 @@ class CloudoooTestCase(unittest.TestCase):
'application/vnd.oasis.opendocument.text')
res = proxy.run_setmetadata("t.odt", odt_data, {"Title": "test"})
self.assertEqual(res[0], 200)
response_code, response_dict, response_message = \
response_code, response_dict, _ = \
proxy.run_convert("t.odt", res[1]['data'])
self.assertEqual(response_code, 200)
self.assertEqual(response_dict['meta']['Title'], "test")
......@@ -45,7 +45,7 @@ def main():
opt_list, _ = getopt(sys.argv[1:], "",
["port=", "hostname="])
except GetoptError as e:
print >> sys.stderr, "%s \nUse --port and --hostname" % e
print("%s \nUse --port and --hostname" % e, file=sys.stderr)
sys.exit(2)
for opt, arg in opt_list:
......@@ -55,7 +55,7 @@ def main():
HOSTNAME = arg
if not HOSTNAME and not PORT:
print >> sys.stderr, "Use --port and --hostname"
print("Use --port and --hostname", file=sys.stderr)
sys.exit(2)
suite = unittest.TestLoader().loadTestsFromTestCase(CloudoooTestCase)
unittest.TextTestRunner(verbosity=2).run(suite)
......@@ -6,4 +6,4 @@ def main():
cloudooo_conf_path = pkg_resources.resource_filename("cloudooo",
path.join("sample", "cloudooo.conf.in"))
print open(cloudooo_conf_path).read()
print(open(cloudooo_conf_path).read())
......@@ -39,12 +39,12 @@ from cloudooo.interfaces.file import IFile
@implementer(IFile)
class File(object):
class File:
"""File is used to manipulate one temporary file
stored into the filesystem.
"""
def __init__(self, base_folder_url, data, source_format):
def __init__(self, base_folder_url:str, data:bytes, source_format:str):
"""Create an file into file system and store the URL.
Keyword arguments:
base_folder_url -- Full path to create a temporary folder
......@@ -60,7 +60,7 @@ class File(object):
def _createDirectory(self):
return tempfile.mkdtemp(dir=self.base_folder_url)
def load(self):
def load(self) -> str:
"""Creates one Temporary Document and write data into it.
Return the url for the document.
"""
......@@ -68,7 +68,8 @@ class File(object):
file_path = tempfile.mktemp(suffix=".%s" % self.source_format,
dir=self.directory_name)
# stores the data in temporary file
open(file_path, 'wb').write(self.original_data)
with open(file_path, 'wb') as f:
f.write(self.original_data)
# If is a zipfile is need extract all files from whitin the compressed file
if is_zipfile(file_path):
zipfile = ZipFile(file_path)
......@@ -91,7 +92,7 @@ class File(object):
remove(zipfile_path)
return file_path
def getContent(self, zip=False):
def getContent(self, zip=False) -> bytes:
"""Open the file and returns the content.
Keyword Arguments:
zip -- Boolean attribute. If True"""
......@@ -106,14 +107,16 @@ class File(object):
zipfile.write(file)
finally:
zipfile.close()
opened_zip = open(zip_path, 'r').read()
with open(zip_path, 'rb') as f:
opened_zip = f.read()
remove(zip_path)
chdir(current_dir_url)
return opened_zip
else:
return open(self.url, 'r').read()
with open(self.url, 'rb') as f:
return f.read()
def getUrl(self):
def getUrl(self) -> str:
"""Returns full path."""
return self.url
......
......@@ -36,7 +36,7 @@ from subprocess import Popen, PIPE
from tempfile import mktemp
@implementer(IHandler)
class Handler(object):
class Handler:
"""FFMPEG Handler is used to handler inputed audio and video files"""
def __init__(self, base_folder_url, data, source_format, **kw):
......@@ -56,7 +56,7 @@ class Handler(object):
# XXX This implementation could use ffmpeg -i pipe:0, but
# XXX seems super unreliable currently and it generates currupted files in
# the end
logger.debug("FfmpegConvert: %s > %s" % (self.input.source_format, destination_format))
logger.debug("FfmpegConvert: %s > %s", self.input.source_format, destination_format)
output_url = mktemp(suffix=".%s" % destination_format,
dir=self.input.directory_name)
command = ["ffmpeg",
......
......@@ -30,7 +30,7 @@
from os.path import join
from cloudooo.tests.cloudoooTestCase import TestCase
from cloudooo.tests.backportUnittest import skip
from unittest import skip
class TestAllSupportedFormat(TestCase):
......
......@@ -44,14 +44,17 @@ class TestServer(TestCase):
self.runConversionList(self.ConversionScenarioList())
def FaultConversionScenarioList(self):
return [
# Test to verify if server fail when a empty string is sent
('', '', ''),
# Try convert one video for a invalid format
(open(join('data', 'test.ogv')).read(), 'ogv', 'xyz'),
# Try convert one video to format not possible
(open(join('data', 'test.ogv')).read(), 'ogv', 'moov'),
]
scenario_list = [
# Test to verify if server fail when a empty file is sent
(b'', '', ''),
]
# Try convert one video for a invalid format
with open(join('data', 'test.ogv'), 'rb') as f:
scenario_list.append((f.read(), 'ogv', 'xyz'))
# Try convert one video to format not possible
with open(join('data', 'test.ogv'), 'rb') as f:
scenario_list.append((f.read(), 'ogv', 'moov'))
return scenario_list
def testFaultConversion(self):
"""Test fail convertion of Invalid video files"""
......
......@@ -38,7 +38,7 @@ from tempfile import mktemp
@implementer(IHandler)
class Handler(object):
class Handler:
"""ImageMagic Handler is used to handler images."""
def __init__(self, base_folder_url, data, source_format, **kw):
......@@ -49,7 +49,7 @@ class Handler(object):
def convert(self, destination_format=None, **kw):
"""Convert a image"""
logger.debug("ImageMagickConvert: %s > %s" % (self.file.source_format, destination_format))
logger.debug("ImageMagickConvert: %s > %s", self.file.source_format, destination_format)
output_url = mktemp(suffix='.%s' % destination_format,
dir=self.base_folder_url)
command = ["convert", self.file.getUrl(), output_url]
......@@ -73,6 +73,7 @@ class Handler(object):
stdout=PIPE,
stderr=PIPE,
close_fds=True,
text=True,
env=self.environment).communicate()
self.file.trash()
metadata_dict = {}
......@@ -80,7 +81,7 @@ class Handler(object):
std = std.strip()
if re.search("^[a-zA-Z]", std):
if std.count(":") > 1:
key, value = re.compile(".*\:\ ").split(std)
key, value = re.compile(r".*\:\ ").split(std)
else:
key, value = std.split(":")
metadata_dict[key] = value.strip()
......
......@@ -41,7 +41,8 @@ class TestHandler(HandlerTestCase):
def testConvertPNGtoJPG(self):
"""Test conversion of png to jpg"""
png_file = open("data/test.png").read()
with open("data/test.png", "rb") as f:
png_file = f.read()
handler = Handler(self.tmp_url, png_file, "png", **self.kw)
jpg_file = handler.convert("jpg")
mime = magic.Magic(mime=True)
......@@ -50,7 +51,8 @@ class TestHandler(HandlerTestCase):
def testgetMetadataFromImage(self):
"""Test if metadata is extracted from image correctly"""
png_file = open("data/test.png").read()
with open("data/test.png", "rb") as f:
png_file = f.read()
handler = Handler(self.tmp_url, png_file, "png", **self.kw)
metadata = handler.getMetadata()
self.assertEqual(metadata.get("Compression"), "Zip")
......@@ -59,7 +61,7 @@ class TestHandler(HandlerTestCase):
def testsetMetadata(self):
""" Test if metadata are inserted correclty """
handler = Handler(self.tmp_url, "", "png", **self.kw)
handler = Handler(self.tmp_url, b"", "png", **self.kw)
self.assertRaises(NotImplementedError, handler.setMetadata)
......@@ -44,14 +44,17 @@ class TestServer(TestCase):
self.runConversionList(self.ConversionScenarioList())
def FaultConversionScenarioList(self):
return [
# Test to verify if server fail when a empty string is sent
('', '', ''),
# Try convert one video for a invalid format
(open(join('data', 'test.png')).read(), 'png', 'xyz'),
# Try convert one video to format not possible
(open(join('data', 'test.png')).read(), 'png', '8bim'),
]
scenario_list = [
# Test to verify if server fail when a empty file is sent
(b'', '', ''),
]
# Try convert one png for a invalid format
with open(join('data', 'test.png'), 'rb') as f:
scenario_list.append((f.read(), 'png', 'xyz'))
# Try convert one png to format not possible
with open(join('data', 'test.png'), 'rb') as f:
scenario_list.append((f.read(), 'png', '8bim'))
return scenario_list
def testFaultConversion(self):
"""Test fail convertion of Invalid image files"""
......
......@@ -36,7 +36,7 @@ from psutil import pid_exists, Process, AccessDenied
@implementer(IApplication)
class Application(object):
class Application:
"""Base object to create an object that is possible manipulation a
process"""
......@@ -44,15 +44,17 @@ class Application(object):
def start(self, init=True):
"""Start Application"""
logger.debug("Process Started %s, Port %s. Pid %s" % (self.name,
self.getAddress()[-1],
self.pid()))
logger.debug(
"Process Started %s, Port %s. Pid %s",
self.name,
self.getAddress()[-1],
self.pid())
def stop(self):
"""Stop the process"""
if hasattr(self, 'process') and self.status():
process_pid = self.process.pid
logger.debug("Stop Pid - %s" % process_pid)
logger.debug("Stop Pid - %s", process_pid)
try:
self.process.terminate()
waitStopDaemon(self, self.timeout)
......@@ -104,3 +106,9 @@ class Application(object):
if not hasattr(self, 'process'):
return None
return self.process.pid
def hasExited(self):
"""Check if process has exited running"""
if not hasattr(self, 'process'):
return True
return self.process.poll() is not None
......@@ -35,7 +35,7 @@ from psutil import AccessDenied, NoSuchProcess
from os.path import exists, join
from threading import Lock
from zope.interface import implementer
from application import Application
from .application import Application
from cloudooo.interfaces.lockable import ILockable
from cloudooo.util import logger
from cloudooo.handler.ooo.util import waitStartDaemon, \
......@@ -60,8 +60,10 @@ class OpenOffice(Application):
def _testOpenOffice(self, host, port):
"""Test if OpenOffice was started correctly"""
logger.debug("Test OpenOffice %s - Pid %s" % (self.getAddress()[-1],
self.pid()))
logger.debug(
"Test OpenOffice %s - Pid %s",
self.getAddress()[-1],
self.pid())
python = join(self.office_binary_path, "python")
args = [exists(python) and python or "python3",
pkg_resources.resource_filename("cloudooo",
......@@ -70,14 +72,14 @@ class OpenOffice(Application):
"--hostname=%s" % host,
"--port=%s" % port,
"--uno_path=%s" % self.uno_path]
logger.debug("Testing Openoffice Instance %s" % port)
logger.debug("Testing Openoffice Instance %s", port)
try:
subprocess.check_output(args, stderr=subprocess.STDOUT, close_fds=True)
except subprocess.CalledProcessError as e:
logger.warning(e.output)
return False
else:
logger.debug("Instance %s works" % port)
logger.debug("Instance %s works", port)
return True
def _cleanRequest(self):
......@@ -153,12 +155,12 @@ class OpenOffice(Application):
env["TMPDIR"] = self.path_user_installation
self._startProcess(self.command, env)
self._cleanRequest()
Application.start(self)
super().start()
def stop(self):
"""Stop the instance by pid. By the default
the signal is 15."""
Application.stop(self)
super().stop()
if socketStatus(self.hostname, self.port):
self._releaseOpenOfficePort()
self._cleanRequest()
......@@ -174,7 +176,7 @@ class OpenOffice(Application):
def release(self):
"""Unlock Instance."""
logger.debug("OpenOffice %s, %s unlocked" % self.getAddress())
logger.debug("OpenOffice %s, %s unlocked", *self.getAddress())
self._lock.release()
openoffice = OpenOffice()
......@@ -30,7 +30,7 @@
from zope.interface import implementer
from zipfile import ZipFile
from StringIO import StringIO
from io import BytesIO
from lxml import etree
from cloudooo.interfaces.file import IOdfDocument
from cloudooo.file import File
......@@ -41,9 +41,10 @@ class FileSystemDocument(File):
@implementer(IOdfDocument)
class OdfDocument(object):
class OdfDocument:
"""Manipulates odf documents in memory"""
def __init__(self, data, source_format):
"""Open the the file in memory.
......@@ -51,22 +52,22 @@ class OdfDocument(object):
data -- Content of the document
source_format -- Document Extension
"""
self._zipfile = ZipFile(StringIO(data))
self._zipfile = ZipFile(BytesIO(data))
self.source_format = source_format
# XXX - Maybe parsed_content should not be here, but on OOGranulate
self.parsed_content = etree.fromstring(self.getContentXml())
def getContentXml(self):
"""Returns the content.xml file as string"""
def getContentXml(self) -> bytes:
"""Returns the content.xml file as bytes"""
return self._zipfile.read('content.xml')
def getFile(self, path):
"""If exists, returns file as string, else return an empty string"""
def getFile(self, path:str) -> bytes:
"""If exists, returns file as bytes, otherwise b''"""
try:
return self._zipfile.read(path)
except KeyError:
return ''
return b''
def trash(self):
"""Remove the file in memory."""
......
......@@ -33,9 +33,10 @@ from cloudooo.interfaces.filter import IFilter
@implementer(IFilter)
class Filter(object):
class Filter:
"""Filter of OOo."""
def __init__(self, extension, filter, mimetype, document_service, **kwargs):
"""Receives extension, filter and mimetype of filter and saves in object.
"""
......
......@@ -29,7 +29,7 @@
##############################################################################
from zipfile import ZipFile
from StringIO import StringIO
from io import BytesIO
from lxml import etree
from os import path
from cloudooo.util import logger
......@@ -61,17 +61,17 @@ def getTemplatePath(format):
return path.join(path.dirname(__file__), 'template.%s' % format)
class OOGranulator(object):
class OOGranulator:
"""Granulate an OpenOffice document into tables, images, chapters and
paragraphs."""
def __init__(self, file, source_format):
def __init__(self, file:bytes, source_format:str):
self.document = OdfDocument(file, source_format)
def _odfWithoutContentXml(self, format='odt'):
"""Returns an odf document without content.xml
It is a way to escape from this issue: http://bugs.python.org/issue6818"""
new_odf_document = ZipFile(StringIO(), 'a')
new_odf_document = ZipFile(BytesIO(), 'a')
template_path = getTemplatePath(format)
template_file = ZipFile(template_path)
for item in template_file.filelist:
......@@ -202,7 +202,7 @@ class OOGranulator(object):
image_list = []
for xml_image in xml_image_list:
id = xml_image.values()[0].split('/')[-1]
id = list(xml_image.values())[0].split('/')[-1]
title = ''.join(xml_image.xpath(IMAGE_TITLE_XPATH_QUERY%(stylename_list[0], name_list[0], id),
namespaces=xml_image.nsmap))
if title != '':
......@@ -250,9 +250,9 @@ class OOGranulator(object):
text = ''.join(paragraph.itertext())
if TEXT_ATTRIB_STYLENAME not in paragraph.attrib.keys():
logger.error("Unable to find %s attribute at paragraph %s " % \
(TEXT_ATTRIB_STYLENAME, paragraph_id))
if TEXT_ATTRIB_STYLENAME not in paragraph.attrib:
logger.error("Unable to find %s attribute at paragraph %s ",
TEXT_ATTRIB_STYLENAME, paragraph_id)
return None
p_class = paragraph.attrib[TEXT_ATTRIB_STYLENAME]
......@@ -262,22 +262,17 @@ class OOGranulator(object):
chapter_list = self.document.parsed_content.xpath(
CHAPTER_XPATH_QUERY,
namespaces=self.document.parsed_content.nsmap)
return chapter_list
return [str(x) for x in chapter_list]
def getChapterItemList(self):
"""Returns the list of chapters in the form of (id, level)."""
id = 0
chapter_list = []
for chapter in self._getChapterList():
chapter_list.append((id, chapter.encode('utf-8')))
id += 1
return chapter_list
return list(enumerate(self._getChapterList()))
def getChapterItem(self, chapter_id):
"""Return the chapter in the form of (title, level)."""
chapter_list = self._getChapterList()
try:
chapter = chapter_list[chapter_id].encode('utf-8')
chapter = chapter_list[chapter_id]
return [chapter, chapter_id]
except IndexError:
msg = "Unable to find chapter %s at chapter list." % chapter_id
......
......@@ -31,7 +31,7 @@
import json
import pkg_resources
import mimetypes
from base64 import decodestring, encodestring
from base64 import decodebytes, encodebytes
from os import environ, path
from subprocess import Popen, PIPE
from cloudooo.handler.ooo.application.openoffice import openoffice
......@@ -46,7 +46,7 @@ from psutil import pid_exists
@implementer(IHandler)
class Handler(object):
class Handler:
"""OOO Handler is used to access the one Document and OpenOffice.
For each Document inputed is created on instance of this class to manipulate
the document. This Document must be able to create and remove a temporary
......@@ -76,9 +76,9 @@ class Handler(object):
# backward compatibility.
# The heuristic is "if it's not utf-8", let's assume it's iso-8859-15.
try:
unicode(data, 'utf-8')
data.decode('utf-8')
except UnicodeDecodeError:
data = unicode(data, 'iso-8859-15').encode('utf-8')
data = data.decode('iso-8859-15').encode('utf-8')
logger.warn("csv data is not utf-8, assuming iso-8859-15")
self.document = FileSystemDocument(
base_folder_url,
......@@ -99,7 +99,7 @@ class Handler(object):
'--document_url=%s' % self.document.getUrl()]
for arg in args:
command_list.insert(3, "--%s" % arg)
for k, v in kw.iteritems():
for k, v in kw.items():
command_list.append("--%s=%s" % (k, v))
return command_list
......@@ -138,16 +138,16 @@ class Handler(object):
stdout, stderr = self._subprocess(command_list)
if not stdout and stderr:
first_error = stderr
logger.error(stderr)
logger.error(stderr.decode())
self.document.restoreOriginal()
openoffice.restart()
kw['document_url'] = self.document.getUrl()
command = self._getCommand(*feature_list, **kw)
stdout, stderr = self._subprocess(command)
if not stdout and stderr:
second_error = "\nerror of the second run: " + stderr
logger.error(second_error)
raise Exception(first_error + second_error)
second_error = b"\nerror of the second run: " + stderr
logger.error(second_error.decode())
raise Exception((first_error + second_error).decode(errors='replace'))
return stdout, stderr
......@@ -165,7 +165,7 @@ class Handler(object):
filter_list.append((destination_extension,
service_type,
mimemapper.getFilterName(destination_extension, service_type)))
logger.debug("Filter List: %r" % filter_list)
logger.debug("Filter List: %r", filter_list)
return json.dumps(dict(doc_type_list_by_extension=mimemapper._doc_type_list_by_extension,
filter_list=filter_list,
mimetype_by_filter_type=mimemapper._mimetype_by_filter_type))
......@@ -177,7 +177,7 @@ class Handler(object):
"""
if not self.enable_scripting and kw.get('script'):
raise Exception("ooo: scripting is disabled")
logger.debug("OooConvert: %s > %s" % (self.source_format, destination_format))
logger.debug("OooConvert: %s > %s", self.source_format, destination_format)
kw['source_format'] = self.source_format
if destination_format:
kw['destination_format'] = destination_format
......@@ -186,10 +186,10 @@ class Handler(object):
kw['refresh'] = json.dumps(self.refresh)
openoffice.acquire()
try:
stdout, stderr = self._callUnoConverter(*['convert'], **kw)
stdout, _ = self._callUnoConverter(*['convert'], **kw)
finally:
openoffice.release()
url = stdout.replace('\n', '')
url = stdout.replace(b'\n', b'')
self.document.reload(url)
content = self.document.getContent(self.zip)
self.document.trash()
......@@ -198,8 +198,8 @@ class Handler(object):
def getMetadata(self, base_document=False):
"""Returns a dictionary with all metadata of document.
Keywords Arguments:
base_document -- Boolean variable. if true, the document is also returned
along with the metadata."""
base_document -- Boolean variable. if true, the document content (as bytes)
is also returned along with the metadata."""
logger.debug("getMetadata")
kw = dict(mimemapper=self._serializeMimemapper())
if base_document:
......@@ -208,10 +208,10 @@ class Handler(object):
feature_list = ['getmetadata']
openoffice.acquire()
try:
stdout, stderr = self._callUnoConverter(*feature_list, **kw)
stdout, _ = self._callUnoConverter(*feature_list, **kw)
finally:
openoffice.release()
metadata = json.loads(decodestring(stdout))
metadata = json.loads(decodebytes(stdout))
if 'document_url' in metadata:
self.document.reload(metadata['document_url'])
metadata['Data'] = self.document.getContent()
......@@ -224,12 +224,11 @@ class Handler(object):
Keyword arguments:
metadata -- expected an dictionary with metadata.
"""
metadata_pickled = json.dumps(metadata)
logger.debug("setMetadata")
kw = dict(metadata=encodestring(metadata_pickled))
metadata_pickled = json.dumps(metadata).encode()
kw = dict(metadata=encodebytes(metadata_pickled).decode())
openoffice.acquire()
try:
stdout, stderr = self._callUnoConverter(*['setmetadata'], **kw)
self._callUnoConverter(*['setmetadata'], **kw)
finally:
openoffice.release()
doc_loaded = self.document.getContent()
......@@ -249,7 +248,7 @@ class Handler(object):
# XXX please never guess extension from mimetype
output_set = set()
if "/" in source_mimetype:
parsed_mimetype_type = parseContentType(source_mimetype).gettype()
parsed_mimetype_type = parseContentType(source_mimetype).get_content_type()
# here `guess_all_extensions` never handles mimetype parameters
# (even for `text/plain;charset=UTF-8` which is standard)
extension_list = mimetypes.guess_all_extensions(parsed_mimetype_type) # XXX never guess
......
......@@ -40,11 +40,6 @@ from base64 import b64encode, b64decode
from functools import partial
from getopt import getopt, GetoptError
try:
basestring
except NameError:
basestring = str
__doc__ = """
usage: unodocument [options]
......@@ -81,7 +76,17 @@ Options:
"""
class UnoDocument(object):
MARKER = []
def next(gen, default=MARKER):
try:
return gen.__next__()
except StopIteration:
if default is MARKER:
raise
return default
class UnoDocument:
"""A module to easily work with OpenOffice.org."""
def __init__(self, service_manager, document_url,
......@@ -242,9 +247,9 @@ class UnoDocument(object):
continue
property_value = getattr(container, property_name, '')
if property_value:
if isinstance(property_value, basestring):
if isinstance(property_value, str):
metadata[property_name] = property_value
elif isinstance(property_value, tuple) and isinstance(property_value[0], basestring):
elif isinstance(property_value, tuple) and isinstance(property_value[0], str):
metadata[property_name] = property_value
else:
try:
......@@ -284,7 +289,7 @@ class UnoDocument(object):
if isinstance(current_value, tuple):
if isinstance(value, list):
value = tuple(value)
elif isinstance(value, basestring):
elif isinstance(value, str):
# BBB: old ERP5 code sends Keywords as a string
# separated by a whitespace.
value = tuple(value.split(' '))
......@@ -294,7 +299,7 @@ class UnoDocument(object):
else:
new_properties.append([prop, value])
for prop, value in new_properties:
if isinstance(value, basestring):
if isinstance(value, str):
user_defined_properties.addProperty(prop, 0, '')
user_defined_properties.setPropertyValue(prop, value)
self.document_loaded.store()
......@@ -311,7 +316,7 @@ def main():
help_msg = "\nUse --help or -h\n"
try:
opt_list, arg_list = getopt(sys.argv[1:], "h", ["help", "test",
opt_list, _ = getopt(sys.argv[1:], "h", ["help", "test",
"convert", "getmetadata", "setmetadata",
"uno_path=", "office_binary_path=",
"hostname=", "port=", "source_format=",
......@@ -325,10 +330,8 @@ def main():
param_list = [tuple[0] for tuple in iter(opt_list)]
try:
import json
except ImportError:
import simplejson as json
import json
metadata = mimemapper = script = None
hostname = port = office_binary_path = uno_path = None
document_url = destination_format = source_format = infilter = refresh = None
......
......@@ -30,23 +30,10 @@
##############################################################################
import sys
try:
import json
except ImportError:
import simplejson as json
import json
import helper_util
from getopt import getopt, GetoptError
# python3 support
try:
basestring
except NameError:
basestring = str
try:
long
except NameError:
long = int
__doc__ = """
......@@ -65,7 +52,7 @@ Options:
"""
class UnoMimemapper(object):
class UnoMimemapper:
""" """
def __init__(self, hostname, port, uno_path=None, office_binary_path=None):
......@@ -74,7 +61,7 @@ class UnoMimemapper(object):
uno_path,
office_binary_path)
def _getElementNameByService(self, uno_service, ignore_name_list=[]):
def _getElementNameByService(self, uno_service, ignore_name_list):
"""Returns an dict with elements."""
name_list = uno_service.getElementNames()
service_dict = {}
......@@ -84,7 +71,7 @@ class UnoMimemapper(object):
for obj in iter(element_list):
if obj.Name in ignore_name_list:
continue
if not isinstance(obj.Value, (bool, int, long, basestring, tuple)):
if not isinstance(obj.Value, (bool, int, str, tuple)):
continue
element_dict[obj.Name] = obj.Value
service_dict[name] = element_dict
......
......@@ -33,15 +33,14 @@ from re import findall
from subprocess import Popen, PIPE
from subprocess import STDOUT
from zope.interface import implementer
from filter import Filter
from .filter import Filter
from os import environ, path
from cloudooo.interfaces.mimemapper import IMimemapper
from types import InstanceType
import json
@implementer(IMimemapper)
class MimeMapper(object):
class MimeMapper:
"""Load all filters from OOo. You can get the filter you want or all
filters of the specific extension.
"""
......@@ -64,27 +63,10 @@ class MimeMapper(object):
def _typeToDocumentService(self, document_type):
"""Returns the document service according to document type."""
for k, v in self._document_type_dict.iteritems():
for k, v in self._document_type_dict.items():
if k.startswith(document_type):
return v
def _getElementNameByService(self, uno_service, ignore_name_list=[]):
"""Returns an dict with elements."""
name_list = uno_service.getElementNames()
service_dict = {}
for name in iter(name_list):
element_dict = {}
element_list = uno_service.getByName(name)
for obj in iter(element_list):
if obj.Name in ignore_name_list:
continue
elif type(obj.Value) == InstanceType:
continue
element_dict[obj.Name] = obj.Value
service_dict[name] = element_dict
return service_dict
def isLoaded(self):
"""Verify if filters were loaded"""
return self._loaded
......@@ -103,6 +85,7 @@ class MimeMapper(object):
alternative_extension_dict = {
'Microsoft Excel 2007 XML':'ms.xlsx',
'Microsoft Excel 2007-2013 XML':'ms.xlsx',
'Excel 2007–365':'ms.xlsx',
'Microsoft Excel 5.0':'5.xls',
'Microsoft Excel 95':'95.xls',
'Microsoft PowerPoint 2007 XML AutoPlay':'ms.ppsx',
......@@ -111,8 +94,10 @@ class MimeMapper(object):
'Microsoft PowerPoint 2007-2013 XML':'ms.pptx',
'Microsoft Word 2007 XML':'ms.docx',
'Microsoft Word 2007-2013 XML':'ms.docx',
'Word 2007–365':'ms.docx',
'Microsoft Word 6.0':'6.doc',
'Microsoft Word 95':'95.doc',
'TIFF - Tagged Image File Format': 'tiff',
}
uno_path = kw.get("uno_path", environ.get('uno_path'))
office_binary_path = kw.get("office_binary_path",
......@@ -133,21 +118,24 @@ class MimeMapper(object):
filter_dict, type_dict = json.loads(stdout)
ooo_disable_filter_list = kw.get("ooo_disable_filter_list") or [] + [
# 'writer_jpg_Export', # Seems not working from cloudooo in Libre Office 4.3.3.2
# 'writer_png_Export', # Seems not working from cloudooo in Libre Office 4.3.3.2
# 'draw_eps_Export', # Seems not working from cloudooo in Libre Office 5.0.0.5
# 'impress_eps_Export', # Seems not working from cloudooo in Libre Office 5.0.0.5
# https://bugs.documentfoundation.org/show_bug.cgi?id=117252
'writer_web_jpg_Export',
'writer_web_png_Export',
'writer_web_webp_Export',
]
ooo_disable_filter_name_list = kw.get("ooo_disable_filter_name_list") or [] + [
'Text', # Use 'Text - Choose Encoding' instead
'Text (StarWriter/Web)', # Use 'Text - Choose Encoding (Writer/Web)' instead
'ODF Drawing (Impress)', # broken for presentation
]
for filter_name, value in filter_dict.iteritems():
for filter_name, value in filter_dict.items():
if filter_name in ooo_disable_filter_list:
continue
ui_name = value.get('UIName')
filter_type = value.get('Type')
filter_type_dict = type_dict.get(filter_type)
if not filter_type_dict:
continue
if not ui_name:
ui_name = filter_type_dict.get("UIName")
if ui_name in ooo_disable_filter_name_list or 'Template' in ui_name:
......@@ -229,7 +217,7 @@ class MimeMapper(object):
'pdf': ['com.sun.star.drawing.DrawingDocument'],
'xls': ['com.sun.star.sheet.SpreadsheetDocument'],
})
self.document_service_list = self._extension_list_by_type.keys()
self.document_service_list = list(self._extension_list_by_type.keys())
self._loaded = True
def getFilterName(self, extension, document_service):
......@@ -244,11 +232,11 @@ class MimeMapper(object):
filter_list = [filter for filter in self.getFilterList(extension) \
if filter.getDocumentService() == document_service]
if len(filter_list) > 1:
for filter in iter(filter_list):
for filter in filter_list:
if filter.isPreferred():
return filter.getName()
else:
for filter in iter(filter_list):
for filter in filter_list:
if filter.getName().endswith("Export"):
return filter.getName()
filter_list.sort(key=lambda x: x.getSortIndex())
......@@ -290,10 +278,10 @@ class MimeMapper(object):
allowed_extension_list.extend(self._extension_list_by_type.get(document_type, []))
# gets list of extensions of each document type if document_type_list isn't
# empty.
for type in iter(document_type_list):
for type in document_type_list:
# gets list of extensions with key document type
extension_list = self._extension_list_by_type.get(type)
for ext in iter(extension_list):
for ext in extension_list:
if not ext in allowed_extension_list:
allowed_extension_list.append(ext)
return tuple(allowed_extension_list)
......
from request import MonitorRequest
from memory import MonitorMemory
from sleeping_time import MonitorSpleepingTime
from .request import MonitorRequest
from .memory import MonitorMemory
from .sleeping_time import MonitorSpleepingTime
from cloudooo.handler.ooo.application.openoffice import openoffice
from cloudooo.util import convertStringToBool
......
......@@ -28,7 +28,7 @@
#
##############################################################################
from monitor import Monitor
from .monitor import Monitor
from multiprocessing import Process
import psutil
from cloudooo.util import logger
......@@ -54,7 +54,7 @@ class MonitorMemory(Monitor, Process):
if not hasattr(self, 'process') or \
self.process.pid != int(self.openoffice.pid()):
self.create_process()
return self.process.memory_info().rss / (1024 * 1024)
return self.process.memory_info().rss // (1024 * 1024)
except TypeError:
logger.debug("OpenOffice is stopped")
return 0
......
......@@ -33,7 +33,7 @@ from cloudooo.interfaces.monitor import IMonitor
@implementer(IMonitor)
class Monitor(object):
class Monitor:
""" """
def __init__(self, openoffice, interval):
......
......@@ -28,7 +28,7 @@
#
##############################################################################
from monitor import Monitor
from .monitor import Monitor
from threading import Thread
from cloudooo.util import logger
from time import sleep
......@@ -55,8 +55,8 @@ class MonitorRequest(Monitor, Thread):
while self.status_flag:
if self.openoffice.request > self.request_limit:
self.openoffice.acquire()
logger.debug("Openoffice: %s, %s will be restarted" % \
self.openoffice.getAddress())
logger.debug("Openoffice: %s, %s will be restarted",
*self.openoffice.getAddress())
self.openoffice.restart()
self.openoffice.release()
sleep(self.interval)
......
......@@ -28,7 +28,7 @@
#
##############################################################################
from monitor import Monitor
from .monitor import Monitor
from threading import Thread
from cloudooo.util import logger
from time import sleep, time
......@@ -66,8 +66,8 @@ class MonitorSpleepingTime(Monitor, Thread):
current_time = time()
if self.openoffice.status() and\
(self._touched_at + self.sleeping_time) <= current_time:
logger.debug("Stopping OpenOffice after sleeping time of %is" %\
self.sleeping_time)
logger.debug("Stopping OpenOffice after sleeping time of %is",
self.sleeping_time)
self.openoffice.acquire()
self.openoffice.stop()
self.openoffice.release()
......
......@@ -28,7 +28,7 @@
#
##############################################################################
from monitor import Monitor
from .monitor import Monitor
from multiprocessing import Process
from time import sleep
from cloudooo.util import logger
......@@ -47,14 +47,14 @@ class MonitorTimeout(Monitor, Process):
"""Start the process"""
port = self.openoffice.getAddress()[-1]
pid = self.openoffice.pid()
logger.debug("Monitoring OpenOffice: Port %s, Pid: %s" % (port, pid))
logger.debug("Monitoring OpenOffice: port %s pid %s timeout: %s", port, pid, self.interval)
self.status_flag = True
sleep(self.interval)
if self.openoffice.isLocked():
logger.debug("Stop OpenOffice - Port %s - Pid %s" % (port, pid))
logger.debug("Stoping OpenOffice: port %s pid %s", port, pid)
self.openoffice.stop()
def terminate(self):
"""Stop the process"""
Monitor.terminate(self)
Process.terminate(self)
Process.terminate(self)
\ No newline at end of file
......@@ -31,11 +31,11 @@
import sys
from multiprocessing import Process
from os import listdir
from xmlrpclib import ServerProxy
from xmlrpc.client import ServerProxy
from os.path import join
from getopt import getopt, GetoptError
from time import ctime, time
from base64 import encodestring
from base64 import encodebytes
__doc__ = """
usage: python HighTestLoad.py [options]
......@@ -51,7 +51,7 @@ Options:
"""
class Log(object):
class Log:
"""Object to manipulate the log file"""
def __init__(self, log_path, mode='a'):
......@@ -109,7 +109,7 @@ class Client(Process):
folder_list = listdir(self.folder)[:self.number_of_request]
for filename in folder_list:
file_path = join(self.folder, filename)
data = encodestring(open(file_path).read())
data = encodebytes(open(file_path).read())
self.log.msg("Input: %s\n" % file_path)
try:
now = time()
......@@ -118,7 +118,7 @@ class Client(Process):
self.log.msg("Duration: %s\n" % response_duration)
time_list.append(response_duration)
self.log.flush()
except Exception, err:
except Exception as err:
self.log.msg("%s\n" % str(err))
self.number_of_request -= 1
......@@ -132,15 +132,15 @@ def main():
help_msg = "\nUse --help or -h"
try:
opt_list, arg_list = getopt(sys.argv[1:], "hc:n:f:s:l:", ["help"])
except GetoptError, msg:
except GetoptError as msg:
msg = msg.msg + help_msg
print >> sys.stderr, msg
print(msg, file=sys.stderr)
sys.exit(2)
kw = {}
for opt, arg in opt_list:
if opt in ('-h', '--help'):
print >> sys.stdout, __doc__
print(__doc__, file=sys.stdout)
sys.exit(2)
elif opt == '-c':
number_client = int(arg)
......
......@@ -31,10 +31,10 @@
import sys
from multiprocessing import Process
from os import listdir
from xmlrpclib import ServerProxy
from xmlrpc.client import ServerProxy
from os.path import join
from getopt import getopt, GetoptError
from base64 import encodestring
from base64 import encodebytes
"""
XXX - This file is duplicated and should be refactored to be used for all
......@@ -58,7 +58,7 @@ Options:
"""
class Log(object):
class Log:
"""Object to manipulate the log file"""
def __init__(self, log_path, mode='a'):
......@@ -114,13 +114,13 @@ class Client(Process):
folder_list = listdir(self.folder)[:self.number_of_request]
for filename in folder_list:
file_path = join(self.folder, filename)
data = encodestring(open(file_path).read())
data = encodebytes(open(file_path).read())
try:
proxy.getTableItemList(data, 'odt')
proxy.getChapterItemList(data, 'odt')
proxy.getImageItemList(data, 'odt')
self.log.msg("Success\n")
except Exception, err:
except Exception as err:
self.log.msg("Error - %s\n"%str(err))
self.number_of_request -= 1
self.log.close()
......@@ -131,15 +131,15 @@ def main():
help_msg = "\nUse --help or -h"
try:
opt_list, arg_list = getopt(sys.argv[1:], "hc:n:f:s:l:", ["help"])
except GetoptError, msg:
except GetoptError as msg:
msg = msg.msg + help_msg
print >> sys.stderr, msg
print(msg, file=sys.stderr)
sys.exit(2)
kw = {}
for opt, arg in opt_list:
if opt in ('-h', '--help'):
print >> sys.stdout, __doc__
print(__doc__, file=sys.stdout)
sys.exit(2)
elif opt == '-c':
number_client = int(arg)
......
......@@ -62,5 +62,5 @@ class TestAllFormats(TestCase):
request = {'document_type': document_type}
extension_list = self.proxy.getAllowedExtensionList(request)
for extension in extension_list:
with self.subTest(extension):
self._testConvertFile(input_url, source_format, extension[0], None)
......@@ -28,9 +28,10 @@
#
##############################################################################
from xmlrpclib import Fault
from xmlrpc.client import Fault
from cloudooo.tests.cloudoooTestCase import TestCase
from base64 import encodestring
from base64 import encodebytes
class TestAllFormatsERP5Compatibility(TestCase):
"""
......@@ -54,27 +55,26 @@ class TestAllFormatsERP5Compatibility(TestCase):
"""Test all spreadsheet formats"""
self.runTestForType('data/test.ods', 'ods', 'application/vnd.oasis.opendocument.spreadsheet')
def runTestForType(self, filename, source_format, source_mimetype):
def runTestForType(self, filename:str, source_format:str, source_mimetype:str) -> None:
"""Generic test"""
extension_list = self.proxy.getAllowedTargetItemList(source_mimetype)[1]['response_data']
fault_list = []
for extension, format in extension_list:
try:
data_output = self.proxy.run_generate(filename,
encodestring(
open(filename).read()),
with open(filename, 'rb') as f:
encoded_data = encodebytes(f.read()).decode()
for extension, _ in extension_list:
with self.subTest(extension):
_, data_output, fault = self.proxy.run_generate(filename,
encoded_data,
None,
extension,
source_mimetype)[1]
source_mimetype)
self.assertFalse(fault)
data_output = data_output['data']
file_type = self._getFileType(data_output)
if file_type.endswith(": empty"):
fault_list.append((source_format, extension, file_type))
except Fault as err:
fault_list.append((source_format, extension, err.faultString))
if fault_list:
template_message = 'input_format: %r\noutput_format: %r\n traceback:\n%s'
message = '\n'.join([template_message % fault for fault in fault_list])
self.fail('Failed Conversions:\n' + message)
self.assertNotIn(": empty", file_type)
if source_format != extension:
# when no filter exist for destination format, the document is not converted
# but silently returned in source format, the assertion below should detect
# this.
self.assertNotEqual(source_mimetype, file_type)
......@@ -30,8 +30,8 @@
import unittest
import magic
from StringIO import StringIO
from base64 import decodestring
from io import BytesIO
from base64 import decodebytes
from os import path
from zipfile import ZipFile
from cloudooo.handler.ooo.document import FileSystemDocument
......@@ -43,7 +43,7 @@ class TestFileSystemDocument(unittest.TestCase):
def setUp(self):
"""Create data to tests and instantiated a FileSystemDocument"""
self.tmp_url = '/tmp'
self.data = decodestring("cloudooo Test")
self.data = decodebytes(b"cloudooo Test")
self.fsdocument = FileSystemDocument(self.tmp_url, self.data, 'txt')
def tearDown(self):
......@@ -58,7 +58,8 @@ class TestFileSystemDocument(unittest.TestCase):
document_filename = "document"
document_test_url = path.join(self.fsdocument.directory_name,
document_filename)
open(document_test_url, 'wb').write(decodestring("Test Document"))
with open(document_test_url, 'wb') as f:
f.write(decodebytes(b"Test Document"))
self.fsdocument.reload(document_test_url)
self.assertEqual(path.exists(old_document_url), False)
self.assertNotEqual(self.fsdocument.original_data,
......@@ -82,7 +83,8 @@ class TestFileSystemDocument(unittest.TestCase):
def testLoadDocumentFile(self):
"""Test if the document is created correctly"""
url = self.fsdocument.getUrl()
tmp_document = open(url, 'r').read()
with open(url, 'rb') as f:
tmp_document = f.read()
self.assertEqual(self.data, tmp_document)
self.fsdocument.trash()
self.assertEqual(path.exists(url), False)
......@@ -93,7 +95,8 @@ class TestFileSystemDocument(unittest.TestCase):
document_filename = "document"
document_test_url = path.join(self.fsdocument.directory_name,
document_filename)
open(document_test_url, 'wb').write(self.data)
with open(document_test_url, 'wb') as f:
f.write(self.data)
self.fsdocument.reload(document_test_url)
url = self.fsdocument.getUrl()
self.assertEqual(path.exists(old_document_url), False)
......@@ -103,12 +106,13 @@ class TestFileSystemDocument(unittest.TestCase):
def testZipDocumentList(self):
"""Tests if the zip file is returned correctly"""
open(path.join(self.fsdocument.directory_name, 'document2'), 'w').write('test')
with open(path.join(self.fsdocument.directory_name, 'document2'), 'w') as f:
f.write('test')
zip_file = self.fsdocument.getContent(True)
mime = magic.Magic(mime=True)
mimetype = mime.from_buffer(zip_file)
self.assertEqual(mimetype, 'application/zip')
ziptest = ZipFile(StringIO(zip_file), 'r')
ziptest = ZipFile(BytesIO(zip_file), 'r')
self.assertEqual(len(ziptest.filelist), 2)
for file in ziptest.filelist:
if file.filename.endswith("document2"):
......@@ -118,15 +122,15 @@ class TestFileSystemDocument(unittest.TestCase):
def testSendZipFile(self):
"""Tests if the htm is extrated from zipfile"""
zip_input_url = 'data/test.zip'
data = open(zip_input_url).read()
with open('./data/test.zip', 'rb') as f:
data = f.read()
zipdocument = FileSystemDocument(self.tmp_url, data, 'zip')
mime = magic.Magic(mime=True)
mimetype = mime.from_buffer(zipdocument.getContent(True))
self.assertEqual(mimetype, "application/zip")
mimetype = mime.from_buffer(zipdocument.getContent())
self.assertEqual(mimetype, "text/html")
zipfile = ZipFile(StringIO(zipdocument.getContent(True)))
zipfile = ZipFile(BytesIO(zipdocument.getContent(True)))
self.assertEqual(sorted(zipfile.namelist()),
sorted(['logo.gif', 'test.htm']))
......
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2010 Nexedi SA and Contributors. All Rights Reserved.
......@@ -30,7 +29,7 @@
##############################################################################
from zipfile import ZipFile
from StringIO import StringIO
from io import BytesIO
from lxml import etree
from cloudooo.tests.handlerTestCase import HandlerTestCase
from cloudooo.handler.ooo.granulator import OOGranulator
......@@ -39,7 +38,8 @@ from cloudooo.handler.ooo.granulator import OOGranulator
class TestOOGranulator(HandlerTestCase):
def setUp(self):
data = open('./data/granulate_test.odt').read()
with open('./data/granulate_test.odt', 'rb') as f:
data = f.read()
self.oogranulator = OOGranulator(data, 'odt')
def testOdfWithoutContentXml(self):
......@@ -56,7 +56,8 @@ class TestOOGranulator(HandlerTestCase):
def testgetTableItemList(self):
"""Test if getTableItemList() returns the right tables list"""
data = open('./data/granulate_table_test.odt').read()
with open('./data/granulate_table_test.odt', 'rb') as f:
data = f.read()
oogranulator = OOGranulator(data, 'odt')
table_list = [('Developers', ''),
('Prices', 'Table 1: Prices table from Mon Restaurant'),
......@@ -65,10 +66,11 @@ class TestOOGranulator(HandlerTestCase):
def testGetTable(self):
"""Test if getTable() returns on odf file with the right table"""
data = open('./data/granulate_table_test.odt').read()
with open('./data/granulate_table_test.odt', 'rb') as f:
data = f.read()
oogranulator = OOGranulator(data, 'odt')
table_data_doc = oogranulator.getTable('Developers')
content_xml_str = ZipFile(StringIO(table_data_doc)).read('content.xml')
content_xml_str = ZipFile(BytesIO(table_data_doc)).read('content.xml')
content_xml = etree.fromstring(content_xml_str)
table_list = content_xml.xpath('//table:table',
namespaces=content_xml.nsmap)
......@@ -79,21 +81,24 @@ class TestOOGranulator(HandlerTestCase):
def testGetTableItemWithoutSuccess(self):
"""Test if getTable() returns None for an non existent table name"""
data = open('./data/granulate_table_test.odt').read()
with open('./data/granulate_table_test.odt', 'rb') as f:
data = f.read()
oogranulator = OOGranulator(data, 'odt')
table_data = oogranulator.getTable('NonExistentTable')
self.assertEqual(table_data, None)
def testGetColumnItemList(self):
"""Test if getColumnItemList() returns the right table columns list"""
data = open('./data/granulate_table_test.odt').read()
with open('./data/granulate_table_test.odt', 'rb') as f:
data = f.read()
oogranulator = OOGranulator(data, 'odt')
self.assertEqual([[0, 'Name'], [1, 'Country']],
oogranulator.getColumnItemList('SoccerTeams'))
def testGetLineItemList(self):
"""Test if getLineItemList() returns the right table lines list"""
data = open('./data/granulate_table_test.odt').read()
with open('./data/granulate_table_test.odt', 'rb') as f:
data = f.read()
oogranulator = OOGranulator(data, 'odt')
matrix = [['Name', 'Phone', 'Email'],
['Hugo', '+55 (22) 8888-8888', 'hugomaia@tiolive.com'],
......@@ -121,8 +126,9 @@ class TestOOGranulator(HandlerTestCase):
def testGetImageSuccessfully(self):
"""Test if getImage() returns the right image file successfully"""
data = open('./data/granulate_test.odt').read()
zip = ZipFile(StringIO(data))
with open('./data/granulate_test.odt', 'rb') as f:
data = f.read()
zip = ZipFile(BytesIO(data))
image_id = '10000000000000C80000009C38276C51.jpg'
original_image = zip.read('Pictures/%s' % image_id)
geted_image = self.oogranulator.getImage(image_id)
......@@ -131,13 +137,14 @@ class TestOOGranulator(HandlerTestCase):
def testGetImageWithoutSuccess(self):
"""Test if getImage() returns an empty string for not existent id"""
obtained_image = self.oogranulator.getImage('anything.png')
self.assertEqual('', obtained_image)
self.assertEqual(b'', obtained_image)
def testGetParagraphItemList(self):
"""Test if getParagraphItemList() returns the right paragraphs list, with
the ids always in the same order"""
for i in range(5):
data = open('./data/granulate_test.odt').read()
for _ in range(5):
with open('./data/granulate_test.odt', 'rb') as f:
data = f.read()
oogranulator = OOGranulator(data, 'odt')
paragraph_list = oogranulator.getParagraphItemList()
self.assertEqual((0, 'P3'), paragraph_list[0])
......@@ -153,8 +160,8 @@ class TestOOGranulator(HandlerTestCase):
big_paragraph = self.oogranulator.getParagraph(5)
self.assertEqual('P8', big_paragraph[1])
self.assertTrue(big_paragraph[0].startswith(u'A prática cotidiana prova'))
self.assertTrue(big_paragraph[0].endswith(u'corresponde às necessidades.'))
self.assertTrue(big_paragraph[0].startswith('A prática cotidiana prova'))
self.assertTrue(big_paragraph[0].endswith('corresponde às necessidades.'))
def testGetParagraphItemWithoutSuccess(self):
"""Test if getParagraphItem() returns None for not existent id"""
......@@ -162,7 +169,8 @@ class TestOOGranulator(HandlerTestCase):
def testGetChapterItemList(self):
"""Test if getChapterItemList() returns the right chapters list"""
data = open('./data/granulate_chapters_test.odt').read()
with open('./data/granulate_chapters_test.odt', 'rb') as f:
data = f.read()
oogranulator = OOGranulator(data, 'odt')
self.assertEqual([(0, 'Title 0'), (1, 'Title 1'), (2, 'Title 2'),
(3, 'Title 3'), (4, 'Title 4'), (5, 'Title 5'),
......@@ -172,7 +180,8 @@ class TestOOGranulator(HandlerTestCase):
def testGetChapterItem(self):
"""Test if getChapterItem() returns the right chapter"""
data = open("./data/granulate_chapters_test.odt").read()
with open("./data/granulate_chapters_test.odt", 'rb') as f:
data = f.read()
oogranulator = OOGranulator(data, 'odt')
self.assertEqual(['Title 1', 1], oogranulator.getChapterItem(1))
......@@ -28,7 +28,7 @@
#
##############################################################################
from base64 import encodestring, decodestring
from base64 import encodebytes, decodebytes
from multiprocessing import Process, Array
from cloudooo.tests.cloudoooTestCase import TestCase
import magic
......@@ -39,8 +39,8 @@ mime_decoder = magic.Magic(mime=True)
def basicTestToGenerate(id, proxy, data, source_format, destination_format,
result_list):
"""Test to use method generate of server"""
document = proxy.convertFile(encodestring(data), source_format, destination_format)
mimetype = mime_decoder.from_buffer(decodestring(document))
document = proxy.convertFile(encodebytes(data), source_format, destination_format)
mimetype = mime_decoder.from_buffer(decodebytes(document))
assert mimetype == 'application/pdf'
result_list[id] = True
......
......@@ -46,7 +46,7 @@ from cloudooo.interfaces.monitor import IMonitor
from cloudooo.interfaces.granulate import ITableGranulator, \
IImageGranulator, \
ITextGranulator
from cloudooo.tests.backportUnittest import TestCase, expectedFailure
from unittest import TestCase, expectedFailure
import zope.interface.verify
......
......@@ -28,7 +28,7 @@
#
##############################################################################
from base64 import encodestring
from base64 import encodebytes
from cloudooo.tests.cloudoooTestCase import TestCase
from pkg_resources import resource_filename
......@@ -39,10 +39,11 @@ class TestLegacyInterface(TestCase):
"""Check implicit base conversion of HTML documents."""
filename = resource_filename('cloudooo.handler.ooo.tests.data',
'test_failure_conversion.html')
data = open(filename, 'r').read()
with open(filename, 'rb') as f:
data = f.read()
status, response_dict, message = self.proxy.run_convert(
filename,
encodestring(data),
encodebytes(data).decode(),
None,
None,
'text/html')
......@@ -55,9 +56,10 @@ class TestLegacyInterface(TestCase):
"""Check conversion of HTML to odt"""
filename = resource_filename('cloudooo.handler.ooo.tests.data',
'test_failure_conversion.html')
data = open(filename, 'r').read()
with open(filename, 'rb') as f:
data = f.read()
status, response_dict, message = self.proxy.run_generate(filename,
encodestring(data),
encodebytes(data).decode(),
None,
'odt',
'text/html')
......
......@@ -33,7 +33,6 @@ from time import sleep
from cloudooo.handler.ooo.application.openoffice import openoffice
from cloudooo.handler.ooo.monitor.memory import MonitorMemory
from psutil import Process
from types import IntType
OPENOFFICE = True
......@@ -69,11 +68,14 @@ class TestMonitorMemory(unittest.TestCase):
def testMonitorWithLittleMemoryLimit(self):
"""Test the monitor with limit of 10Mb. In This case the openoffice will be
stopped"""
stopped after some time"""
try:
self.monitor = MonitorMemory(openoffice, 2, 10)
self.monitor = MonitorMemory(openoffice, 1, 10)
self.monitor.start()
sleep(self.interval)
for _ in range(30):
sleep(1)
if not openoffice.status():
break
self.assertEqual(openoffice.status(), False)
finally:
self.monitor.terminate()
......@@ -94,17 +96,15 @@ class TestMonitorMemory(unittest.TestCase):
self.monitor = MonitorMemory(openoffice, 2, 400)
self.monitor.create_process()
self.assertTrue(hasattr(self.monitor, 'process'))
self.assertEqual(type(self.monitor.process), Process)
self.assertIsInstance(self.monitor.process, Process)
def testGetMemoryUsage(self):
"""Test memory usage"""
self.monitor = MonitorMemory(openoffice, 2, 400)
openoffice.stop()
memory_usage_int = self.monitor.get_memory_usage()
self.assertEqual(memory_usage_int, 0)
memory_usage = self.monitor.get_memory_usage()
self.assertEqual(memory_usage, 0)
if not openoffice.status():
openoffice.start()
memory_usage_int = self.monitor.get_memory_usage()
self.assertEqual(type(memory_usage_int), IntType)
memory_usage = self.monitor.get_memory_usage()
self.assertIsInstance(memory_usage, int)
......@@ -61,13 +61,15 @@ class TestMonitorTimeout(unittest.TestCase):
try:
monitor_timeout = self._createMonitor(1)
monitor_timeout.start()
sleep(2)
sleep(5)
self.assertEqual(openoffice.status(), False)
openoffice.restart()
self.assertTrue(openoffice.status())
finally:
monitor_timeout.terminate()
openoffice.release()
try:
monitor_timeout.terminate()
finally:
openoffice.release()
def testStopOpenOfficeTwice(self):
"""Test the cases that is necessary start the monitors twice"""
......@@ -75,20 +77,22 @@ class TestMonitorTimeout(unittest.TestCase):
try:
monitor_timeout = self._createMonitor(1)
monitor_timeout.start()
sleep(2)
sleep(5)
self.assertEqual(openoffice.status(), False)
monitor_timeout.terminate()
openoffice.restart()
self.assertTrue(openoffice.status())
monitor_timeout = self._createMonitor(1)
monitor_timeout.start()
sleep(2)
sleep(5)
self.assertEqual(openoffice.status(), False)
monitor_timeout.terminate()
sleep(1)
self.assertEqual(monitor_timeout.is_alive(), False)
finally:
monitor_timeout.terminate()
openoffice.release()
try:
monitor_timeout.terminate()
finally:
openoffice.release()
......@@ -37,17 +37,18 @@ from cloudooo.handler.ooo.document import OdfDocument
class TestOdfDocument(HandlerTestCase):
def setUp(self):
data = open('./data/granulate_test.odt').read()
with open('./data/granulate_test.odt', 'rb') as f:
data = f.read()
self.oodocument = OdfDocument(data, 'odt')
def testReceivedGoodFile(self):
"""Test if received path is from a good document returing an ZipFile"""
self.assertTrue(isinstance(self.oodocument._zipfile, ZipFile))
self.assertIsInstance(self.oodocument._zipfile, ZipFile)
def testGetContentXml(self):
"""Test if the getContentXml method returns the content.xml file"""
content_xml = self.oodocument.getContentXml()
self.assertTrue('The content of this file is just' in content_xml)
self.assertIn(b'The content of this file is just', content_xml)
def testGetExistentFile(self):
"""Test if the getFile method returns the requested file"""
......@@ -57,12 +58,12 @@ class TestOdfDocument(HandlerTestCase):
def testGetNotPresentFile(self):
"""Test if the getFile method returns None for not present file request"""
requested_file = self.oodocument.getFile('not_present.xml')
self.assertEqual(requested_file, '')
self.assertEqual(requested_file, b'')
def testParseContent(self):
"""Test if the _parsed_content attribute is the parsed content.xml"""
# XXX not sure it is good to store parsed document everytime
self.assertTrue(isinstance(self.oodocument.parsed_content, etree._Element))
self.assertIsInstance(self.oodocument.parsed_content, etree._Element)
self.assertTrue(self.oodocument.parsed_content.tag.endswith(
'document-content'))
......
This diff is collapsed.
......@@ -50,7 +50,8 @@ class TestUnoConverter(HandlerTestCase):
""" """
openoffice.acquire()
self.hostname, self.port = openoffice.getAddress()
data = open("data/test.odt", 'r').read()
with open("data/test.odt", "rb") as f:
data = f.read()
self.document = FileSystemDocument(self.tmp_url, data, 'odt')
def tearDown(self):
......@@ -78,6 +79,7 @@ class TestUnoConverter(HandlerTestCase):
"--source_format=%s" % "odt",
"--mimemapper=%s" % mimemapper_pickled]
stdout, stderr = Popen(command,
text=True,
stdout=PIPE,
stderr=PIPE).communicate()
self.assertEqual(stderr, '')
......
......@@ -67,11 +67,10 @@ class TestUnoMimeMapper(HandlerTestCase):
"--port=%s" % self.openoffice_port]
stdout, stderr = Popen(command,
stdout=PIPE,
stderr=PIPE).communicate()
stderr=PIPE,
text=True).communicate()
self.assertEqual(stderr, '')
filter_dict, type_dict = json.loads(stdout)
self.assertTrue('filter_dict' in locals())
self.assertTrue('type_dict' in locals())
self.assertNotEqual(filter_dict.get('writer8'), None)
self.assertEqual(type_dict.get('writer8').get('Name'), 'writer8')
self.assertNotEqual(filter_dict.get('writer8'), None)
......@@ -88,11 +87,10 @@ class TestUnoMimeMapper(HandlerTestCase):
"--port=%s" % self.openoffice_port]
stdout, stderr = Popen(command,
stdout=PIPE,
stderr=PIPE).communicate()
stderr=PIPE,
text=True).communicate()
self.assertEqual(stderr, '')
filter_dict, type_dict = json.loads(stdout)
self.assertTrue('filter_dict' in locals())
self.assertTrue('type_dict' in locals())
self.assertNotEqual(filter_dict.get('writer8'), None)
self.assertEqual(type_dict.get('writer8').get('Name'), 'writer8')
self.assertNotEqual(filter_dict.get('writer8'), None)
......@@ -101,7 +99,7 @@ class TestUnoMimeMapper(HandlerTestCase):
def testWithoutOpenOffice(self):
"""Test when the openoffice is stopped"""
error_msg = "couldn\'t connect to socket (Success)\n"
error_msg = "couldn\'t connect to socket"
hostname, host = openoffice.getAddress()
openoffice.stop()
python = path.join(self.office_binary_path, "python")
......@@ -117,9 +115,10 @@ class TestUnoMimeMapper(HandlerTestCase):
for i in range(10):
stdout, stderr = Popen(command,
stdout=PIPE,
stderr=PIPE).communicate()
stderr=PIPE,
text=True).communicate()
self.assertEqual(stdout, '')
self.assertTrue(stderr.endswith(error_msg), stderr)
self.assertIn(error_msg, stderr)
openoffice.start()
......@@ -28,6 +28,7 @@
#
##############################################################################
import contextlib
from socket import socket, error
from errno import EADDRINUSE
from time import sleep
......@@ -47,7 +48,8 @@ def removeDirectory(path):
def socketStatus(hostname, port):
"""Verify if the address is busy."""
try:
socket().bind((hostname, port),)
with contextlib.closing(socket()) as sock:
sock.bind((hostname, port))
# False if the is free
return False
except error as err:
......@@ -61,7 +63,7 @@ def waitStartDaemon(daemon, attempts):
for num in range(attempts):
if daemon.status():
return True
elif daemon.pid() is None:
elif daemon.hasExited():
return False
sleep(1)
return False
......@@ -70,7 +72,7 @@ def waitStartDaemon(daemon, attempts):
def waitStopDaemon(daemon, attempts=5):
"""Wait a certain time to stop the daemon."""
for num in range(attempts):
if not daemon.status():
if not daemon.status() or daemon.hasExited():
return True
sleep(1)
return False
......
......@@ -36,11 +36,13 @@ from cloudooo.util import logger, parseContentType
from subprocess import Popen, PIPE
from tempfile import mktemp
from pyPdf import PdfFileWriter, PdfFileReader
from pyPdf.generic import NameObject, createStringObject
from pypdf import PdfWriter, PdfReader
from pypdf.generic import NameObject, createStringObject
@implementer(IHandler)
class Handler(object):
class Handler:
"""PDF Handler is used to handler inputed pdf document."""
def __init__(self, base_folder_url, data, source_format, **kw):
......@@ -52,7 +54,7 @@ class Handler(object):
def convert(self, destination_format=None, **kw):
""" Convert a pdf document """
# TODO: use pyPdf
logger.debug("PDFConvert: %s > %s" % (self.document.source_format, destination_format))
logger.debug("PDFConvert: %s > %s", self.document.source_format, destination_format)
output_url = mktemp(suffix=".%s" % destination_format,
dir=self.document.directory_name)
command = ["pdftotext", self.document.getUrl(), output_url]
......@@ -77,8 +79,9 @@ class Handler(object):
stdout=PIPE,
stderr=PIPE,
close_fds=True,
text=True,
env=self.environment).communicate()
info_list = filter(None, stdout.split("\n"))
info_list = [_f for _f in stdout.split("\n") if _f]
metadata = {}
for info in iter(info_list):
info = info.split(":")
......@@ -94,8 +97,8 @@ class Handler(object):
metadata -- expected an dictionary with metadata.
"""
# TODO: date as "D:20090401124817-04'00'" ASN.1 for ModDate and CreationDate
input_pdf = PdfFileReader(open(self.document.getUrl(), "rb"))
output_pdf = PdfFileWriter()
input_pdf = PdfReader(self.document.getUrl())
output_pdf = PdfWriter()
modification_date = metadata.pop("ModificationDate", None)
if modification_date:
......@@ -103,13 +106,13 @@ class Handler(object):
if type(metadata.get('Keywords', None)) is list:
metadata['Keywords'] = metadata['Keywords'].join(' ')
args = {}
for key, value in list(metadata.items()):
for key, value in metadata.items():
args[NameObject('/' + key.capitalize())] = createStringObject(value)
output_pdf._info.getObject().update(args)
output_pdf._info.get_object().update(args)
for page_num in range(input_pdf.getNumPages()):
output_pdf.addPage(input_pdf.getPage(page_num))
for page in input_pdf.pages:
output_pdf.add_page(page)
output_stream = io.BytesIO()
output_pdf.write(output_stream)
......@@ -124,7 +127,7 @@ class Handler(object):
...
]
"""
source_mimetype = parseContentType(source_mimetype).gettype()
source_mimetype = parseContentType(source_mimetype).get_content_type()
if source_mimetype in ("application/pdf", "pdf"):
return [("text/plain", "Plain Text")]
return []
......@@ -31,7 +31,6 @@
from cloudooo.handler.pdf.handler import Handler
from cloudooo.tests.handlerTestCase import HandlerTestCase
from types import DictType
class TestHandler(HandlerTestCase):
......@@ -41,23 +40,26 @@ class TestHandler(HandlerTestCase):
def testConvertPDFtoText(self):
"""Test conversion of pdf to txt"""
pdf_document = open("data/test.pdf").read()
with open("data/test.pdf", "rb") as f:
pdf_document = f.read()
handler = Handler(self.tmp_url, pdf_document, "pdf", **self.kw)
txt_document = handler.convert("txt")
self.assertTrue(txt_document.startswith("UNG Docs Architecture"))
self.assertTrue(txt_document.startswith(b"UNG Docs Architecture"))
def testgetMetadata(self):
"""Test if the metadata are extracted correctly"""
pdf_document = open("data/test.pdf").read()
with open("data/test.pdf", "rb") as f:
pdf_document = f.read()
handler = Handler(self.tmp_url, pdf_document, "pdf", **self.kw)
metadata = handler.getMetadata()
self.assertEqual(type(metadata), DictType)
self.assertIsInstance(metadata, dict)
self.assertNotEqual(metadata, {})
self.assertEqual(metadata["title"], 'Free Cloud Alliance Presentation')
def testsetMetadata(self):
"""Test if the metadata is inserted correctly"""
pdf_document = open("data/test.pdf").read()
with open("data/test.pdf", "rb") as f:
pdf_document = f.read()
handler = Handler(self.tmp_url, pdf_document, "pdf", **self.kw)
metadata_dict = {"title": "Set Metadata Test", "creator": "gabriel\'@"}
new_document = handler.setMetadata(metadata_dict)
......
......@@ -45,14 +45,17 @@ class TestServer(TestCase):
self.runConversionList(self.ConversionScenarioList())
def FaultConversionScenarioList(self):
return [
# Test to verify if server fail when a empty string is sent
('', '', ''),
# Try convert one video for a invalid format
(open(join('data', 'test.pdf')).read(), 'pdf', 'xyz'),
# Try convert one video to format not possible
(open(join('data', 'test.pdf')).read(), 'pdf', 'ogv'),
]
scenario_list = [
# Test to verify if server fail when a empty file is sent
(b'', '', ''),
]
# Try convert one video for a invalid format
with open(join('data', 'test.pdf'), 'rb') as f:
scenario_list.append((f.read(), 'pdf', 'xyz'))
# Try convert one video to format not possible
with open(join('data', 'test.pdf'), 'rb') as f:
scenario_list.append((f.read(), 'pdf', 'ogv'))
return scenario_list
def testFaultConversion(self):
"""Test fail convertion of Invalid pdf files"""
......
......@@ -40,7 +40,7 @@ def keyNameToOption(key_name, prefix=""):
return "--" + prefix + key_name.replace("_", "-")
@implementer(IHandler)
class Handler(object):
class Handler:
"""ImageMagic Handler is used to handler images."""
def __init__(self, base_folder_url, data, source_format, **kw):
......@@ -66,7 +66,7 @@ class Handler(object):
def convert(self, destination_format=None, **kw):
"""Convert a image"""
logger.debug("wkhtmltopdf convert: %s > %s" % (self.file.source_format, destination_format))
logger.debug("wkhtmltopdf convert: %s > %s", self.file.source_format, destination_format)
output_path = self.makeTempFile(destination_format)
command = self.makeWkhtmltopdfCommandList(
self.convertPathToUrl(self.file.getUrl()),
......@@ -109,7 +109,7 @@ class Handler(object):
...
]
"""
source_mimetype = parseContentType(source_mimetype).gettype()
source_mimetype = parseContentType(source_mimetype).get_content_type()
if source_mimetype in ("text/html", "htm", "html"):
return [("application/pdf", "PDF - Portable Document Format")]
return []
......@@ -272,10 +272,12 @@ class Handler(object):
return option_list
def makeDataPathArgumentOptionList(self, *args, **kw):
return self.makeDataUrlArgumentOptionList(*args, url_type="path", **kw)
kw['url_type'] = "path"
return self.makeDataUrlArgumentOptionList(*args, **kw)
def makeDataFileArgumentOptionList(self, *args, **kw):
return self.makeDataUrlArgumentOptionList(*args, url_type="file", **kw)
kw['url_type'] = "file"
return self.makeDataUrlArgumentOptionList(*args, **kw)
def makeRepeatableDataUrlArgumentOptionList(self, allowed_option_list,
option_dict, **kw):
......
......@@ -40,7 +40,8 @@ class TestHandler(HandlerTestCase):
self.kw = dict(env=dict(PATH=self.env_path))
def _testBase(self, html_path, **conversion_kw):
html_file = open(html_path).read()
with open(html_path, 'rb') as f:
html_file = f.read()
handler = Handler(self.tmp_url, html_file, "html", **self.kw)
pdf_file = handler.convert("pdf", **conversion_kw)
mime = magic.Magic(mime=True)
......@@ -67,16 +68,18 @@ class TestHandler(HandlerTestCase):
def testConvertHtmlWithTableOfContent(self):
"""Test conversion of html with an additional table of content"""
with open("data/test_toc.xsl", 'rb') as f:
xsl_style_sheet_data = f.read()
self._testBase(
"data/test_with_toc.html",
toc=True,
xsl_style_sheet_data=b64encode(open("data/test_toc.xsl").read()),
xsl_style_sheet_data=b64encode(xsl_style_sheet_data),
)
# XXX how to check for table of content presence ?
def testsetMetadata(self):
""" Test if metadata are inserted correclty """
handler = Handler(self.tmp_url, "", "png", **self.kw)
handler = Handler(self.tmp_url, b"", "png", **self.kw)
self.assertRaises(NotImplementedError, handler.setMetadata)
def testGetAllowedConversionFormatList(self):
......
......@@ -44,10 +44,11 @@ class TestServer(TestCase):
self.runConversionList(self.ConversionScenarioList())
def FaultConversionScenarioList(self):
return [
# Test to verify if server fail when a empty string is sent
('', '', ''),
# Try convert one html for a invalid format
(open(join('data', 'test_with_png_dataurl.html')).read(), 'html', 'xyz'),
scenario_list = [
# Test to verify if server fail when a empty file is sent
(b'', '', ''),
]
# Try convert one html for a invalid format
with open(join('data', 'test_with_png_dataurl.html'), 'rb') as f:
scenario_list.append((f.read(), 'html', 'xyz'))
return scenario_list
......@@ -157,7 +157,7 @@ yformat_tuple = ("docy", "xlsy", "ppty")
@implementer(IHandler)
class Handler(object):
class Handler:
"""
X2T Handler is used to convert Microsoft Office 2007 documents to OnlyOffice
documents.
......@@ -183,7 +183,7 @@ class Handler(object):
def convert(self, destination_format=None, **kw):
""" Convert the inputed file to output as format that were informed """
source_format = self.file.source_format
logger.debug("x2t convert: %s > %s" % (source_format, destination_format))
logger.debug("x2t convert: %s > %s", source_format, destination_format)
# init vars and xml configuration file
in_format = format_code_map[source_format]
......@@ -198,7 +198,7 @@ class Handler(object):
output_data = None
if source_format in yformat_tuple:
if self._data.startswith("PK\x03\x04"):
if self._data.startswith(b"PK\x03\x04"):
os.mkdir(input_dir)
unzip(self.file.getUrl(), input_dir)
input_file_name = os.path.join(input_dir, "body.txt")
......@@ -227,7 +227,7 @@ class Handler(object):
root = ElementTree.Element('root')
for key, value in config.items():
ElementTree.SubElement(root, key).text = value
ElementTree.ElementTree(root).write(config_file, encoding='utf-8', xml_declaration=True,
ElementTree.ElementTree(root).write(config_file, encoding='unicode', xml_declaration=True,
default_namespace=None, method="xml")
# run convertion binary
......@@ -280,7 +280,7 @@ class Handler(object):
def getMetadata(self, base_document=False):
r"""Returns a dictionary with all metadata of document.
"""
if self._source_format in yformat_tuple and self._data.startswith("PK\x03\x04"):
if self._source_format in yformat_tuple and self._data.startswith(b"PK\x03\x04"):
if base_document:
openxml_format = yformat_map[self._source_format]
data = self.convert(yformat_map[self._source_format])
......@@ -305,7 +305,7 @@ class Handler(object):
"""
if metadata is None:
metadata = {}
if self._source_format in yformat_tuple and self._data.startswith("PK\x03\x04"):
if self._source_format in yformat_tuple and self._data.startswith(b"PK\x03\x04"):
root_dir = self.file.directory_name
output_file_name = os.path.join(root_dir, "tmp")
try:
......@@ -321,7 +321,8 @@ class Handler(object):
absolute_path = os.path.join(root, file_name)
file_name = os.path.join(relative_root, file_name)
zipfile.write(absolute_path, file_name)
output_data = open(output_file_name).read()
with open(output_file_name, 'rb') as f:
output_data = f.read()
finally:
os.unlink(output_file_name)
return output_data
......@@ -329,7 +330,7 @@ class Handler(object):
return OOoHandler(self.base_folder_url, self._data, self._source_format, **self._init_kw).setMetadata(metadata)
@staticmethod
def getAllowedConversionFormatList(source_mimetype):
def getAllowedConversionFormatList(source_mimetype:str):
"""Returns a list content_type and their titles which are supported
by enabled handlers.
......@@ -337,7 +338,7 @@ class Handler(object):
...
]
"""
source_mimetype = parseContentType(source_mimetype).gettype()
source_mimetype = parseContentType(source_mimetype).get_content_type()
if source_mimetype in ("docy", "application/x-asc-text"):
return [
("application/vnd.openxmlformats-officedocument.wordprocessingml.document", "Word 2007 Document"),
......
......@@ -30,7 +30,7 @@
from zipfile import ZipFile
from cStringIO import StringIO
from io import BytesIO
from cloudooo.handler.x2t.handler import Handler
from cloudooo.tests.handlerTestCase import HandlerTestCase
......@@ -44,80 +44,94 @@ class TestHandler(HandlerTestCase):
def testConvertXlsx(self):
"""Test conversion of xlsx to xlsy and back"""
y_data = Handler(self.tmp_url, open("data/test.xlsx").read(), "xlsx", **self.kw).convert("xlsy")
y_body_data = ZipFile(StringIO(y_data)).open("body.txt").read()
self.assertTrue(y_body_data.startswith("XLSY;v10;0;"), "%r... does not start with 'XLSY;v10;0;'" % (y_body_data[:20],))
with open("data/test.xlsx", "rb") as f:
data = f.read()
y_data = Handler(self.tmp_url, data, "xlsx", **self.kw).convert("xlsy")
y_body_data = ZipFile(BytesIO(y_data)).open("body.txt").read()
self.assertTrue(y_body_data.startswith(b"XLSY;v10;0;"), "{!r}... does not start with 'XLSY;v10;0;'".format(y_body_data[:20]))
x_data = Handler(self.tmp_url, y_data, "xlsy", **self.kw).convert("xlsx")
# magic inspired by https://github.com/minad/mimemagic/pull/19/files
self.assertIn("xl/", x_data[:2000])
self.assertIn(b"xl/", x_data[:2000])
def testConvertXlsy(self):
"""Test conversion of xlsy to xlsx and back"""
x_data = Handler(self.tmp_url, open("data/test_body.xlsy").read(), "xlsy", **self.kw).convert("xlsx")
self.assertIn("xl/", x_data[:2000])
x_data = Handler(self.tmp_url, open("data/test.xlsy").read(), "xlsy", **self.kw).convert("xlsx")
self.assertIn("xl/", x_data[:2000])
with open("data/test_body.xlsy", "rb") as f:
data = f.read()
x_data = Handler(self.tmp_url, data, "xlsy", **self.kw).convert("xlsx")
self.assertIn(b"xl/", x_data[:2000])
with open("data/test.xlsy", "rb") as f:
data = f.read()
x_data = Handler(self.tmp_url, data, "xlsy", **self.kw).convert("xlsx")
self.assertIn(b"xl/", x_data[:2000])
y_data = Handler(self.tmp_url, x_data, "xlsx", **self.kw).convert("xlsy")
y_body_data = ZipFile(StringIO(y_data)).open("body.txt").read()
self.assertTrue(y_body_data.startswith("XLSY;v10;0;"), "%r... does not start with 'XLSY;v10;0;'" % (y_body_data[:20],))
y_body_data = ZipFile(BytesIO(y_data)).open("body.txt").read()
self.assertTrue(y_body_data.startswith(b"XLSY;v10;0;"), "{!r}... does not start with 'XLSY;v10;0;'".format(y_body_data[:20]))
def testConvertDocx(self):
"""Test conversion of docx to docy and back"""
y_data = Handler(self.tmp_url, open("data/test_with_image.docx").read(), "docx", **self.kw).convert("docy")
y_zip = ZipFile(StringIO(y_data))
with open("data/test_with_image.docx", "rb") as f:
data = f.read()
y_data = Handler(self.tmp_url, data, "docx", **self.kw).convert("docy")
y_zip = ZipFile(BytesIO(y_data))
y_body_data = y_zip.open("body.txt").read()
self.assertTrue(y_body_data.startswith("DOCY;v10;0;"), "%r... does not start with 'DOCY;v10;0;'" % (y_body_data[:20],))
self.assertTrue(y_body_data.startswith(b"DOCY;v10;0;"), "{!r}... does not start with 'DOCY;v10;0;'".format(y_body_data[:20]))
y_zip.open("media/image1.png")
x_data = Handler(self.tmp_url, y_data, "docy", **self.kw).convert("docx")
# magic inspired by https://github.com/minad/mimemagic/pull/19/files
self.assertIn("word/", x_data[:2000])
self.assertIn(b"word/", x_data[:2000])
def testConvertDocy(self):
"""Test conversion of docy to docx and back"""
x_data = Handler(self.tmp_url, open("data/test_with_image.docy").read(), "docy", **self.kw).convert("docx")
self.assertIn("word/", x_data[:2000])
with open("data/test_with_image.docy", "rb") as f:
data = f.read()
x_data = Handler(self.tmp_url, data, "docy", **self.kw).convert("docx")
self.assertIn(b"word/", x_data[:2000])
y_data = Handler(self.tmp_url, x_data, "docx", **self.kw).convert("docy")
y_zip = ZipFile(StringIO(y_data))
y_zip = ZipFile(BytesIO(y_data))
y_body_data = y_zip.open("body.txt").read()
self.assertTrue(y_body_data.startswith("DOCY;v10;0;"), "%r... does not start with 'DOCY;v10;0;'" % (y_body_data[:20],))
self.assertTrue(y_body_data.startswith(b"DOCY;v10;0;"), "{!r}... does not start with 'DOCY;v10;0;'".format(y_body_data[:20]))
y_zip.open("media/image1.png")
def testgetMetadata(self):
"""Test getMetadata from yformats"""
handler = Handler(self.tmp_url, "", "xlsy", **self.kw)
handler = Handler(self.tmp_url, b"", "xlsy", **self.kw)
self.assertEqual(handler.getMetadata(), {
u'CreationDate': u'00/00/0000 00:00:00',
u'ImplementationName': u'com.sun.star.comp.comphelper.OPropertyBag',
u'MIMEType': u'text/plain',
u'ModificationDate': u'00/00/0000 00:00:00',
u'PrintDate': u'00/00/0000 00:00:00',
u'TemplateDate': u'00/00/0000 00:00:00',
'CreationDate': '00/00/0000 00:00:00',
'ImplementationName': 'com.sun.star.comp.comphelper.OPropertyBag',
'MIMEType': 'text/plain',
'ModificationDate': '00/00/0000 00:00:00',
'PrintDate': '00/00/0000 00:00:00',
'TemplateDate': '00/00/0000 00:00:00',
})
handler = Handler(self.tmp_url, open("data/test_with_metadata.xlsy").read(), "xlsy", **self.kw)
with open("data/test_with_metadata.xlsy", "rb") as f:
data = f.read()
handler = Handler(self.tmp_url, data, "xlsy", **self.kw)
self.assertEqual(handler.getMetadata(), {
u'CreationDate': u'31/01/2018 21:09:10',
u'Keywords': [u'\u0442\u0435\u0441\u0442', u'\u0441\u0430\u0431\u0436\u0435\u043a\u0442'],
'CreationDate': '31/01/2018 21:09:10',
'Keywords': ['\u0442\u0435\u0441\u0442', '\u0441\u0430\u0431\u0436\u0435\u043a\u0442'],
'MIMEType': 'application/x-asc-spreadsheet',
u'ModificationDate': u'31/01/2018 21:22:36',
u'PrintDate': u'00/00/0000 00:00:00',
u'Subject': u'\u0432\u044b\u043a\u043b\u044e\u0447\u0438 \u0442\u0435\u043b\u0435\u0432\u0438\u0437\u043e\u0440',
u'TemplateDate': u'00/00/0000 00:00:00',
u'Title': u'kesha'})
'ModificationDate': '31/01/2018 21:22:36',
'PrintDate': '00/00/0000 00:00:00',
'Subject': '\u0432\u044b\u043a\u043b\u044e\u0447\u0438 \u0442\u0435\u043b\u0435\u0432\u0438\u0437\u043e\u0440',
'TemplateDate': '00/00/0000 00:00:00',
'Title': 'kesha'})
def testsetMetadata(self):
"""Test setMetadata for yformats"""
handler = Handler(self.tmp_url, open("data/test_with_metadata.xlsy").read(), "xlsy", **self.kw)
with open("data/test_with_metadata.xlsy", "rb") as f:
data = f.read()
handler = Handler(self.tmp_url, data, "xlsy", **self.kw)
new_mime_data = handler.setMetadata({
"Title": "test title",
"Subject": "test subject",
"Keywords": "test keywords",
})
handler = Handler(self.tmp_url, new_mime_data, "xlsy", **self.kw)
self.assertEqual(handler.getMetadata(), {u'Keywords': u'test keywords', 'MIMEType': 'application/x-asc-spreadsheet', u'Title': u'test title', u'Subject': u'test subject'})
self.assertEqual(handler.getMetadata(), {'Keywords': 'test keywords', 'MIMEType': 'application/x-asc-spreadsheet', 'Title': 'test title', 'Subject': 'test subject'})
def testGetAllowedConversionFormatList(self):
"""Test all combination of mimetype
......
......@@ -28,7 +28,7 @@
##############################################################################
import io
import zipfile
from base64 import decodestring, encodestring
from base64 import decodebytes, encodebytes
from os.path import join
from cloudooo.tests.cloudoooTestCase import TestCase
......@@ -52,38 +52,40 @@ class TestServer(TestCase):
self.runConversionList(self.ConversionScenarioList())
def FaultConversionScenarioList(self):
return [
# Test to verify if server fail when a empty string is sent
('', '', ''),
# Try convert one xlsx for a invalid format
(open(join('data', 'test.xlsx')).read(), 'xlsx', 'xyz'),
scenario_list = [
# Test to verify if server fail when a empty file is sent
(b'', '', ''),
]
# Try convert one xlsx for a invalid format
with open(join('data', 'test.xlsx'), 'rb') as f:
scenario_list.append((f.read(), 'xlsx', 'xyz'))
return scenario_list
def test_xlsx_to_xlsy(self):
with open(join('data', 'test.xlsx')) as f:
with open(join('data', 'test.xlsx'), 'rb') as f:
xlsx_data = f.read()
xlsy_data = self.proxy.convertFile(
encodestring(xlsx_data),
encodebytes(xlsx_data).decode(),
'xlsx',
'xlsy',
False
)
self.assertEqual(
sorted(zipfile.ZipFile(io.BytesIO(decodestring(xlsy_data))).namelist()),
sorted(zipfile.ZipFile(io.BytesIO(decodebytes(xlsy_data.encode()))).namelist()),
sorted(['Editor.xlsx', 'body.txt', 'metadata.json'])
)
def test_docx_to_docy(self):
with open(join('data', 'test_with_image.docx')) as f:
with open(join('data', 'test_with_image.docx'), 'rb') as f:
docx_data = f.read()
docy_data = self.proxy.convertFile(
encodestring(docx_data),
encodebytes(docx_data).decode(),
'docx',
'docy',
False
)
self.assertEqual(
sorted(zipfile.ZipFile(io.BytesIO(decodestring(docy_data))).namelist()),
sorted(zipfile.ZipFile(io.BytesIO(decodebytes(docy_data.encode()))).namelist()),
sorted(['body.txt', 'media/image1.png', 'metadata.json'])
)
This diff is collapsed.
......@@ -51,7 +51,7 @@ def application(global_config, **local_config):
"""
prefix = 'env-'
environment_dict = {}
for parameter_name, value in local_config.iteritems():
for parameter_name, value in local_config.items():
if parameter_name.startswith(prefix):
value = value or ''
variable_name = parameter_name[len(prefix):]
......@@ -81,7 +81,7 @@ def application(global_config, **local_config):
mimetype_registry = local_config.get("mimetype_registry", "")
local_config["mimetype_registry"] = handler_mapping_list = \
filter(None, mimetype_registry.split("\n"))
[_f for _f in mimetype_registry.split("\n") if _f]
ooo_disable_filter_list = []
for filter_name in local_config.get("ooo_disable_filter_list", "").split("\n"):
......@@ -109,6 +109,6 @@ def application(global_config, **local_config):
handler_dict[handler] = module.Handler
local_config['handler_dict'] = handler_dict
from manager import Manager
from .manager import Manager
cloudooo_manager = Manager(cloudooo_path_tmp_dir, **local_config)
return WSGIXMLRPCApplication(instance=cloudooo_manager)
# Backport of Python 2.7 unittest chosen parts to be able to use the
# "skip" decorators, and the associated ExpectedFailure and
# UnexpectedSuccess.
#
# Implementation is mostly a direct translation from Python r75708
# grep for "BACK" comments for backport-specific remarks.
import unittest
import sys
import time
class SkipTest(Exception):
"""
Raise this exception in a test to skip it.
Usually you can use TestResult.skip() or one of the skipping decorators
instead of raising this directly.
"""
pass
class _ExpectedFailure(Exception):
"""
Raise this when a test is expected to fail.
This is an implementation detail.
"""
def __init__(self, exc_info):
Exception.__init__(self)
self.exc_info = exc_info
class _UnexpectedSuccess(Exception):
"""
The test was supposed to fail, but it didn't!
"""
pass
class SetupSiteError(Exception):
"""
The ERP5 Site could not have been setup.
This is raised when the site could not have been created in a previous
test. We want this to count as an error, but we do not want this to happear
in traceback for readability.
"""
pass
def _id(obj):
return obj
def skip(reason):
"""
Unconditionally skip a test.
"""
def decorator(test_item):
if isinstance(test_item, type) and issubclass(test_item, TestCase):
test_item.__unittest_skip__ = True
test_item.__unittest_skip_why__ = reason
return test_item
def skip_wrapper(*args, **kwargs):
raise SkipTest(reason)
skip_wrapper.__name__ = test_item.__name__
skip_wrapper.__doc__ = test_item.__doc__
return skip_wrapper
return decorator
def skipIf(condition, reason):
"""
Skip a test if the condition is true.
"""
if condition:
return skip(reason)
return _id
def skipUnless(condition, reason):
"""
Skip a test unless the condition is true.
"""
if not condition:
return skip(reason)
return _id
def expectedFailure(func):
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception:
raise _ExpectedFailure(sys.exc_info())
raise _UnexpectedSuccess
wrapper.__name__ = func.__name__
wrapper.__doc__ = func.__doc__
return wrapper
class TestCase(unittest.TestCase):
"""We redefine here the run() method, and add a skipTest() method.
"""
failureException = AssertionError
def run(self, result=None):
orig_result = result
if result is None:
result = self.defaultTestResult()
# BACK: Not necessary for Python < 2.7:
# TestResult.startTestRun does not exist yet
# startTestRun = getattr(result, 'startTestRun', None)
# if startTestRun is not None:
# startTestRun()
# BACK: Not needed for Python < 2.7
# unittest.addCleanup does not exist yet
# self._resultForDoCleanups = result
result.startTest(self)
if getattr(self.__class__, "__unittest_skip__", False):
# If the whole class was skipped.
try:
result.addSkip(self, self.__class__.__unittest_skip_why__)
finally:
result.stopTest(self)
return
testMethod = getattr(self, self._testMethodName)
try:
success = False
try:
self.setUp()
except SkipTest as e:
result.addSkip(self, str(e))
except SetupSiteError as e:
result.errors.append(None)
except BaseException as e:
result.addError(self, sys.exc_info())
if isinstance(e, (KeyboardInterrupt, SystemExit)):
raise
else:
try:
testMethod()
except self.failureException:
result.addFailure(self, sys.exc_info())
except _ExpectedFailure as e:
result.addExpectedFailure(self, e.exc_info)
except _UnexpectedSuccess:
result.addUnexpectedSuccess(self)
except SkipTest as e:
result.addSkip(self, str(e))
except BaseException as e:
result.addError(self, sys.exc_info())
if isinstance(e, (KeyboardInterrupt, SystemExit)):
raise
else:
success = True
try:
self.tearDown()
except BaseException as e:
result.addError(self, sys.exc_info())
if isinstance(e, (KeyboardInterrupt, SystemExit)):
raise
success = False
# BACK: Not needed for Python < 2.7
# unittest.addCleanup does not exist yet
# cleanUpSuccess = self.doCleanups()
# success = success and cleanUpSuccess
if success:
result.addSuccess(self)
finally:
result.stopTest(self)
# BACK: Not necessary for Python < 2.7
# TestResult.stopTestRun does not exist yet
# if orig_result is None:
# stopTestRun = getattr(result, 'stopTestRun', None)
# if stopTestRun is not None:
# stopTestRun()
def skipTest(self, reason):
"""Skip this test."""
raise SkipTest(reason)
if not hasattr(unittest.TestResult, 'addSkip'): # BBB: Python < 2.7
unittest.TestResult._orig_init__ = unittest.TestResult.__init__.im_func
def __init__(self):
self._orig_init__()
self.skipped = []
self.expectedFailures = []
self.unexpectedSuccesses = []
def addSkip(self, test, reason):
"""Called when a test is skipped."""
self.skipped.append((test, reason))
if self.showAll:
self.stream.writeln("skipped %s" % repr(reason))
elif self.dots:
self.stream.write("s")
self.stream.flush()
def addExpectedFailure(self, test, err):
"""Called when an expected failure/error occured."""
self.expectedFailures.append(
(test, self._exc_info_to_string(err, test)))
if self.showAll:
self.stream.writeln("expected failure")
elif self.dots:
self.stream.write("x")
self.stream.flush()
def addUnexpectedSuccess(self, test):
"""Called when a test was expected to fail, but succeed."""
self.unexpectedSuccesses.append(test)
if self.showAll:
self.stream.writeln("unexpected success")
elif self.dots:
self.stream.write("u")
self.stream.flush()
for f in __init__, addSkip, addExpectedFailure, addUnexpectedSuccess:
setattr(unittest.TestResult, f.__name__, f)
def getDescription(self, test):
doc_first_line = test.shortDescription()
if self.descriptions and doc_first_line:
return '\n'.join((str(test), doc_first_line))
else:
return str(test)
unittest._TextTestResult.getDescription = getDescription
class _TextTestResult(unittest._TextTestResult):
def wasSuccessful(self):
"Tells whether or not this result was a success"
return not (self.failures or self.errors or self.unexpectedSuccesses)
def printErrors(self):
if self.dots or self.showAll:
self.stream.writeln()
# 'None' correspond to redundant errors due to site creation errors,
# and we do not display them here.
self.printErrorList('ERROR', filter(None, self.errors))
self.printErrorList('FAIL', self.failures)
if self.unexpectedSuccesses:
self.stream.writeln(self.separator1)
for test in self.unexpectedSuccesses:
self.stream.writeln("SUCCESS: %s" % self.getDescription(test))
class TextTestRunner(unittest.TextTestRunner):
def _makeResult(self):
return _TextTestResult(self.stream, self.descriptions, self.verbosity)
def run(self, test):
result = self._makeResult()
startTime = time.time()
# BACK: 2.7 implementation wraps run with result.(start|stop)TestRun
try:
test(result)
except KeyboardInterrupt:
pass
stopTime = time.time()
timeTaken = stopTime - startTime
result.printErrors()
self.stream.writeln(result.separator2)
run = result.testsRun
self.stream.writeln("Ran %d test%s in %.3fs" %
(run, run != 1 and "s" or "", timeTaken))
self.stream.writeln()
results = map(len, (result.expectedFailures,
result.unexpectedSuccesses,
result.skipped))
expectedFails, unexpectedSuccesses, skipped = results
infos = []
if not result.wasSuccessful():
self.stream.write("FAILED")
failed, errored = map(len, (result.failures, result.errors))
if failed:
infos.append("failures=%d" % failed)
if errored:
infos.append("errors=%d" % errored)
else:
self.stream.write("OK")
if skipped:
infos.append("skipped=%d" % skipped)
if expectedFails:
infos.append("expected failures=%d" % expectedFails)
if unexpectedSuccesses:
infos.append("unexpected successes=%d" % unexpectedSuccesses)
if infos:
self.stream.writeln(" (%s)" % (", ".join(infos),))
else:
self.stream.write("\n")
return result
......@@ -30,23 +30,21 @@
import unittest
from os import environ, path
from ConfigParser import ConfigParser
from xmlrpclib import ServerProxy, Fault
from configparser import ConfigParser
from xmlrpc.client import ServerProxy, Fault
from magic import Magic
from types import DictType
from base64 import encodestring, decodestring
from cloudooo.tests import backportUnittest
from base64 import encodebytes, decodebytes
config = ConfigParser()
def make_suite(test_case):
"""Function is used to run all tests together"""
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(test_case))
suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(test_case))
return suite
class TestCase(backportUnittest.TestCase):
class TestCase(unittest.TestCase):
def setUp(self):
server_cloudooo_conf = environ.get("server_cloudooo_conf", None)
......@@ -58,16 +56,20 @@ class TestCase(backportUnittest.TestCase):
#create temporary path for some files
self.working_path = config.get("app:main", "working_path")
self.tmp_url = path.join(self.working_path, "tmp")
self.proxy = ServerProxy(("http://%s:%s/RPC2" % (self.hostname, self.port)),\
self.proxy = ServerProxy(("http://{}:{}/RPC2".format(self.hostname, self.port)),\
allow_none=True)
self.addCleanup(self.proxy('close'))
self.afterSetUp()
def afterSetUp(self):
"""Must be overwrite into subclass in case of need """
def _getFileType(self, output_data):
def _getFileType(self, output_data:str) -> str:
"""get file type of `output_data`
output_data is base64
"""
mime = Magic(mime=True)
mimetype = mime.from_buffer(decodestring(output_data))
mimetype = mime.from_buffer(decodebytes(output_data.encode()))
return mimetype
def _testConvertFile(self, input_url, source_format, destination_format,
......@@ -75,10 +77,13 @@ class TestCase(backportUnittest.TestCase):
""" Generic test for converting file """
fault_list = []
try:
output_data = self.proxy.convertFile(encodestring(open(input_url).read()),
source_format,
destination_format,
zip)
with open(input_url, 'rb') as f:
data = f.read()
output_data = self.proxy.convertFile(
encodebytes(data).decode(),
source_format,
destination_format,
zip)
file_type = self._getFileType(output_data)
if destination_mimetype != None:
self.assertEqual(file_type, destination_mimetype)
......@@ -92,47 +97,51 @@ class TestCase(backportUnittest.TestCase):
message = '\n'.join([template_message % fault for fault in fault_list])
self.fail('Failed Conversions:\n' + message)
def _testFaultConversion(self, data, source_format, destination_format):
def _testFaultConversion(self, data:bytes, source_format:str, destination_format:str):
""" Generic test for fail converting"""
self.assertRaises(Fault, self.proxy.convertFile, (data,
self.assertRaises(Fault, self.proxy.convertFile, (data,
source_format,
destination_format))
def _testGetMetadata(self, input_url, source_format, expected_metadata,
def _testGetMetadata(self, input_url:str, source_format:str, expected_metadata:dict,
base_document=False):
""" Generic tes for getting metadata file"""
with open(input_url, 'rb') as f:
input_data = f.read()
metadata_dict = self.proxy.getFileMetadataItemList(
encodestring(open(input_url).read()),
encodebytes(input_data).decode(),
source_format,
base_document)
for key,value in expected_metadata.iteritems():
for key,value in expected_metadata.items():
self.assertEqual(metadata_dict[key], value)
def _testFaultGetMetadata(self, data, source_format):
def _testFaultGetMetadata(self, data:bytes, source_format:str):
""" Generic test for fail converting"""
self.assertRaises(Fault, self.proxy.getFileMetadataItemList, (data,
self.assertRaises(Fault, self.proxy.getFileMetadataItemList, (data,
source_format))
def _testUpdateMetadata(self, input_url, source_format, metadata_dict):
def _testUpdateMetadata(self, input_url:str, source_format:str, metadata_dict:dict):
""" Generic test for setting metadata for file """
output_data = self.proxy.updateFileMetadata(encodestring(open(input_url).read()),
with open(input_url, 'rb') as f:
input_data = f.read()
output_data = self.proxy.updateFileMetadata(encodebytes(input_data).decode(),
source_format,
metadata_dict)
new_metadata_dict = self.proxy.getFileMetadataItemList(
output_data,
source_format,
False)
for key,value in metadata_dict.iteritems():
for key, value in metadata_dict.items():
self.assertEqual(new_metadata_dict[key], value)
def _testRunConvert(self, filename, data, expected_response_code,
response_dict_data,response_dict_keys,
def _testRunConvert(self, filename:str, data:bytes, expected_response_code:int,
response_dict_data, response_dict_keys:list[str],
expected_response_message, response_dict_meta=None):
"""Generic test for run_convert"""
response_code, response_dict, response_message = \
self.proxy.run_convert(filename, encodestring(data))
self.proxy.run_convert(filename, encodebytes(data).decode())
self.assertEqual(response_code, expected_response_code)
self.assertEqual(type(response_dict), DictType)
self.assertIsInstance(response_dict, dict)
if expected_response_code == 402:
self.assertEqual(response_dict, {})
self.assertTrue(response_message.endswith(expected_response_message),
......@@ -152,7 +161,8 @@ class TestCase(backportUnittest.TestCase):
def runConversionList(self, scenarios):
for scenario in scenarios:
self._testConvertFile(*scenario)
with self.subTest(scenario):
self._testConvertFile(*scenario)
def GetMetadataScenarioList(self):
"""
......@@ -163,7 +173,8 @@ class TestCase(backportUnittest.TestCase):
def runGetMetadataList(self, scenarios):
for scenario in scenarios:
self._testGetMetadata(*scenario)
with self.subTest(scenario):
self._testGetMetadata(*scenario)
def UpdateMetadataScenarioList(self):
"""
......@@ -174,7 +185,8 @@ class TestCase(backportUnittest.TestCase):
def runUpdateMetadataList(self, scenarios):
for scenario in scenarios:
self._testUpdateMetadata(*scenario)
with self.subTest(scenario):
self._testUpdateMetadata(*scenario)
def FaultConversionScenarioList(self):
"""
......@@ -185,7 +197,8 @@ class TestCase(backportUnittest.TestCase):
def runFaultConversionList(self, scenarios):
for scenario in scenarios:
self._testFaultConversion(*scenario)
with self.subTest(scenario):
self._testFaultConversion(*scenario)
def FaultGetMetadataScenarioList(self):
"""
......@@ -196,7 +209,8 @@ class TestCase(backportUnittest.TestCase):
def runFaultGetMetadataList(self, scenarios):
for scenario in scenarios:
self._testFaultGetMetadata(*scenario)
with self.subTest(scenario):
self._testFaultGetMetadata(*scenario)
def ConvertScenarioList(self):
"""
......@@ -207,5 +221,6 @@ class TestCase(backportUnittest.TestCase):
def runConvertScenarioList(self, scenarios):
for scenario in scenarios:
self._testRunConvert(*scenario)
with self.subTest(scenario):
self._testRunConvert(*scenario)
......@@ -32,7 +32,7 @@
import unittest
import sys
from os import environ, path, mkdir, putenv
from ConfigParser import ConfigParser
from configparser import ConfigParser
from cloudooo.handler.ooo.application.openoffice import openoffice
from cloudooo.handler.ooo.mimemapper import mimemapper
......@@ -42,7 +42,7 @@ config = ConfigParser()
def make_suite(test_case):
"""Function is used to run all tests together"""
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(test_case))
suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(test_case))
return suite
......@@ -84,8 +84,8 @@ def startFakeEnvironment(start_openoffice=True, conf_path=None):
if start_openoffice:
default_language = config.get('app:main',
'openoffice_user_interface_language', False,
{'openoffice_user_interface_language': 'en'})
'openoffice_user_interface_language', raw=False,
vars={'openoffice_user_interface_language': 'en'})
openoffice.loadSettings(hostname,
openoffice_port,
working_path,
......@@ -110,27 +110,6 @@ def stopFakeEnvironment(stop_openoffice=True):
openoffice.stop()
return True
if 1:
from cloudooo.handler.ooo.application.openoffice import OpenOffice
from cloudooo.handler.ooo.util import waitStartDaemon, waitStopDaemon
from subprocess import Popen, PIPE
# patch OpenOffice._startProcess not to put bogus output to stderr,
# that prevents detecting the end of unit test.
def _startProcess(self, command, env):
"""Start OpenOffice.org process"""
for i in range(5):
self.stop()
waitStopDaemon(self, self.timeout)
self.process = Popen(command, stderr=PIPE,
close_fds=True,
env=env)
if not waitStartDaemon(self, self.timeout):
continue
if self._testOpenOffice(self.hostname, self.port):
return
OpenOffice._startProcess = _startProcess
class HandlerTestCase(unittest.TestCase):
"""Test Case to load cloudooo conf."""
......
......@@ -2,13 +2,13 @@
import sys
from pkg_resources import resource_filename
import logging
import unittest
from time import sleep
from subprocess import Popen
from ConfigParser import ConfigParser
from configparser import ConfigParser
from argparse import ArgumentParser
from os import chdir, path, environ, curdir, remove
from cloudooo.tests import backportUnittest
from glob import glob
import psutil
from signal import SIGQUIT
......@@ -23,10 +23,7 @@ def wait_use_port(pid, timeout_limit=30):
return False
def exit(msg):
sys.stderr.write(msg)
sys.exit(0)
logger = logging.getLogger(__name__)
def run():
description = "Unit Test Runner for Handlers"
......@@ -34,12 +31,18 @@ def run():
parser.add_argument('server_cloudooo_conf')
parser.add_argument('test_name')
parser.add_argument('--timeout_limit', dest='timeout_limit',
type=long, default=30,
type=int, default=30,
help="Timeout to waiting for the cloudooo stop")
parser.add_argument('--paster_path', dest='paster_path',
default='paster',
help="Path to Paster script")
parser.add_argument('-v', '--verbose', action='store_true', help='Enable logging')
parser.add_argument(
'-D', '--debug', action='store_true',
help='Enable pdb on errors/failures') # XXX but does not show test output
namespace = parser.parse_args()
if namespace.verbose:
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)-8s %(message)s',)
environment_path = glob(path.join(resource_filename("cloudooo", "handler"), '*', 'tests'))
sys.path.extend(environment_path)
server_cloudooo_conf = namespace.server_cloudooo_conf
......@@ -50,32 +53,38 @@ def run():
environ['server_cloudooo_conf'] = server_cloudooo_conf
paster_path = namespace.paster_path
python_extension = '.py'
if test_name[-3:] == python_extension:
test_name = test_name[:-3]
handler_path = None
for env_handler_path in environment_path:
full_path = path.join(env_handler_path, '%s%s' % (test_name,
python_extension))
if path.exists(full_path):
handler_path = env_handler_path
break
if handler_path is None:
exit("%s does not exists\n" % full_path)
from cloudooo.tests.handlerTestCase import startFakeEnvironment
from cloudooo.tests.handlerTestCase import stopFakeEnvironment
config = ConfigParser()
config.read(server_cloudooo_conf)
module = __import__(test_name)
if namespace.debug:
# XXX not really correct but enough to get a pdb prompt
suite = unittest.defaultTestLoader.loadTestsFromName(test_name)
module = __import__(list(suite)[0].__module__)
if module is unittest:
module = __import__(list(list(suite)[0])[0].__module__)
else:
module = __import__(test_name)
suite = unittest.defaultTestLoader.loadTestsFromModule(module)
handler_path = path.dirname(module.__file__)
DAEMON = getattr(module, 'DAEMON', False)
OPENOFFICE = getattr(module, 'OPENOFFICE', False)
TestRunner = backportUnittest.TextTestRunner
suite = unittest.defaultTestLoader.loadTestsFromModule(module)
def run_suite():
if namespace.debug:
import functools
suite.run = functools.partial(suite.run, debug=True)
try:
unittest.TextTestRunner(
verbosity=2,
warnings=None if sys.warnoptions else 'default',
).run(suite)
except:
import pdb; pdb.post_mortem()
raise
if DAEMON:
log_file = '%s/cloudooo_test.log' % config.get('app:main',
......@@ -85,20 +94,24 @@ def run():
command = [paster_path, 'serve', '--log-file', log_file,
server_cloudooo_conf]
process = Popen(command)
logger.debug("Started daemon %s", command)
wait_use_port(process.pid)
logger.debug("Daemon ready")
chdir(handler_path)
try:
TestRunner(verbosity=2).run(suite)
run_suite()
finally:
process.send_signal(SIGQUIT)
process.wait()
elif OPENOFFICE:
chdir(handler_path)
logger.debug("Starting fake environment")
startFakeEnvironment(conf_path=server_cloudooo_conf)
logger.debug("Fake environment ready")
try:
TestRunner(verbosity=2).run(suite)
run_suite()
finally:
stopFakeEnvironment()
else:
chdir(handler_path)
TestRunner(verbosity=2).run(suite)
run_suite()
......@@ -39,7 +39,7 @@ from erp5.util.testsuite import SubprocessError
from erp5.util import taskdistribution
def _parsingErrorHandler(data, _):
print >> sys.stderr, 'Error parsing data:', repr(data)
print('Error parsing data:', repr(data), file=sys.stderr)
taskdistribution.patchRPCParser(_parsingErrorHandler)
class TestSuite(BaseTestSuite):
......@@ -115,7 +115,7 @@ def run():
if test_result is not None:
assert revision == test_result.revision, (revision, test_result.revision)
while suite.acquire():
test = test_result.start(suite.running.keys())
test = test_result.start(list(suite.running.keys()))
if test is not None:
suite.start(test.name, lambda status_dict, __test=test:
__test.stop(**status_dict))
......
......@@ -28,12 +28,12 @@
#
##############################################################################
import email
import email.message
import logging
import mimetypes
import pkg_resources
import os
import mimetools
import cStringIO
from zipfile import ZipFile, ZIP_DEFLATED
logger = logging.getLogger('Cloudooo')
......@@ -96,7 +96,7 @@ convertStringToBool = ('false', 'true').index
def zipTree(destination, *tree_path_list):
"""
destination may be a path or a StringIO
destination may be a path or a file-like
tree_path_list is a list that may contain a path or a couple(path, archive_root)
"""
......@@ -126,17 +126,13 @@ def unzip(source, destination):
with ZipFile(source) as zipfile:
zipfile.extractall(destination)
def parseContentType(content_type):
"""Parses `text/plain;charset="utf-8"` to a mimetools.Message object.
def parseContentType(content_type:str) -> email.message.EmailMessage:
"""Parses `text/plain;charset="utf-8"` to an email object.
Note: Content type or MIME type are built like `maintype/subtype[;params]`.
parsed_content_type = parseContentType('text/plain;charset="utf-8"')
parsed_content_type.gettype() -> 'text/plain'
parsed_content_type.getmaintype() -> 'text'
parsed_content_type.getsubtype() -> 'plain'
parsed_content_type.getplist() -> 'charset="utf-8"'
parsed_content_type.getparam('charset') -> 'utf-8'
parsed_content_type.typeheader -> 'text/plain;charset="utf-8"'
parsed_content_type.get_content_type() -> 'text/plain'
parsed_content_type.get_charset() -> 'utf-8'
"""
return mimetools.Message(cStringIO.StringIO("Content-Type:" + content_type.replace("\r\n", "\r\n\t")))
return email.message_from_string("Content-Type:" + content_type.replace("\r\n", "\r\n\t"))
......@@ -12,16 +12,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from SimpleXMLRPCServer import SimpleXMLRPCDispatcher
import logging
from xmlrpc.server import SimpleXMLRPCDispatcher
logger = logging.getLogger(__name__)
class WSGIXMLRPCApplication(object):
class ErrorLoggingXMLRPCDispatcher(SimpleXMLRPCDispatcher):
"""A XMLRPC Dispatcher which logs errors
"""
def _dispatch(self, method, params):
try:
return super()._dispatch(method, params)
except:
logger.exception("Error calling %s", method)
raise
class WSGIXMLRPCApplication:
"""Application to handle requests to the XMLRPC service"""
def __init__(self, instance=None, methods=[]):
"""Create windmill xmlrpc dispatcher"""
self.dispatcher = SimpleXMLRPCDispatcher(allow_none=True,
encoding=None)
self.dispatcher = ErrorLoggingXMLRPCDispatcher(
allow_none=True,
encoding=None)
if instance is not None:
self.dispatcher.register_instance(instance)
for method in methods:
......@@ -34,7 +48,7 @@ class WSGIXMLRPCApplication(object):
return self.handle_POST(environ, start_response)
else:
start_response("400 Bad request", [('Content-Type', 'text/plain')])
return ['']
return [b'']
def handle_POST(self, environ, start_response):
"""Handles the HTTP POST request.
......@@ -60,9 +74,10 @@ class WSGIXMLRPCApplication(object):
response = self.dispatcher._marshaled_dispatch(
data, getattr(self.dispatcher, '_dispatch', None)
)
response += '\n'
response += b'\n'
except: # This should only happen if the module is buggy
# internal error, report as HTTP server error
logger.exception("Error serving request")
start_response("500 Server error", [('Content-Type',
'text/plain')])
return []
......
......@@ -12,7 +12,7 @@ install_requires = [
'zope.interface',
'PasteDeploy',
'PasteScript[WSGIUtils]',
'pyPdf',
'pyPdf>=3.6.0',
'psutil>=3.0.0',
'lxml',
'python-magic',
......@@ -28,7 +28,7 @@ setup(name='cloudooo',
"Topic :: System :: Networking",
"Topic :: System :: Operating System Kernels :: Linux",
"Topic :: Internet :: WWW/HTTP :: WSGI",
"Programming Language :: Python :: 2.6",
"Programming Language :: Python :: 3",
"Natural Language :: English",
"Framework :: Paste"],
keywords='xmlrpc openoffice wsgi paste python',
......@@ -40,6 +40,7 @@ setup(name='cloudooo',
include_package_data=True,
zip_safe=False,
install_requires=install_requires,
python_requires='>=3.8',
entry_points="""
[paste.app_factory]
main = cloudooo.paster_application:application
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment