Commit bc6b1680 authored by Jérome Perrin's avatar Jérome Perrin

Merge remote-tracking branch 'origin/master' into zope4py2

parents 8553a8c0 eaae74a0
......@@ -355,7 +355,9 @@ class BalanceTransaction(AccountingTransaction, Inventory):
# matching_diff are negated later
if matching_diff:
matching_diff['quantity'] -= round(new_stock['quantity'], precision)
matching_diff['quantity'] = round(
matching_diff['quantity'] - new_stock['quantity'],
precision)
# Matching_diff and new_stock must be consistent.
# both with total price or none.
if matching_diff['total_price'] and new_stock['total_price']:
......
......@@ -25,6 +25,7 @@
#
##############################################################################
import json
import os.path
import tempfile
import textwrap
......@@ -572,14 +573,6 @@ class TestRestrictedPythonSecurity(ERP5TypeTestCase):
)
def testPandasIORead(self):
self.assertRaises(
Unauthorized,
self.createAndRunScript,
'''
import pandas as pd
pd.read_csv('testPandasIORead.csv')
''')
# Test the black_list configuration validity
for read_method in pandas_black_list:
self.assertRaises(
......@@ -635,6 +628,148 @@ class TestRestrictedPythonSecurity(ERP5TypeTestCase):
write_method('testPandasSeriesIOWrite.data')
'''.format(write_method=write_method))
def _assertPandasRestrictedReadFunctionIsEqualTo(
self, read_function, read_argument, expected_data_frame_init
):
self.createAndRunScript(
'''
import pandas as pd
expected_data_frame = pd.DataFrame({expected_data_frame_init})
return pd.{read_function}({read_argument}).equals(expected_data_frame)
'''.format(
expected_data_frame_init=expected_data_frame_init,
read_function=read_function,
read_argument=read_argument,
),
expected=True
)
def testPandasRestrictedReadFunctionProhibitedInput(self):
"""
Test if patched pandas read_* functions raise with any input which isn't a string.
"""
for pandas_read_function in ("read_json", "read_csv", "read_fwf"):
for preparation, prohibited_input in (
('', 100),
('from StringIO import StringIO', 'StringIO("[1, 2, 3]")'),
):
self.assertRaises(
ZopeGuardsUnauthorized,
self.createAndRunScript,
'''
import pandas as pd
{preparation}
pd.{pandas_read_function}({prohibited_input})
'''.format(
preparation=preparation,
pandas_read_function=pandas_read_function,
prohibited_input=prohibited_input,
)
)
def testPandasReadFwf(self):
read_function = "read_fwf"
# Normal input should be correctly handled
self._assertPandasRestrictedReadFunctionIsEqualTo(
read_function, r'"100\n200"', r"[[200]], columns=['100']",
)
# Ensure monkey patch parses keyword arguments to patched function
self._assertPandasRestrictedReadFunctionIsEqualTo(
read_function, r'"1020\n3040", widths=[2, 2]', r"[[30, 40]], columns=['10', '20']",
)
# A string containing an url or file path should be handled as if
# it would be a normal csv string entry
self._assertPandasRestrictedReadFunctionIsEqualTo(
read_function,
r'"file://path/to/fwf/file.fwf"',
r"[], columns=['file://path/to/fwf/file.fwf']",
)
def testPandasReadCSV(self):
read_function = "read_csv"
# Normal input should be correctly handled
self._assertPandasRestrictedReadFunctionIsEqualTo(
read_function,
r'"11,2,300\n50.5,99,hello"',
r"[[50.5, 99, 'hello']], columns='11 2 300'.split(' ')",
)
# Ensure monkey patch parses keyword arguments to patched function
self._assertPandasRestrictedReadFunctionIsEqualTo(
read_function, r'"a;b", sep=";"', r"[], columns=['a', 'b']",
)
# A string containing an url or file path should be handled as if
# it would be a normal csv string entry
self._assertPandasRestrictedReadFunctionIsEqualTo(
read_function,
r'"https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv"',
r"[], columns=['https://people.sc.fsu.edu/~jburkardt/data/csv/addresses.csv']",
)
self._assertPandasRestrictedReadFunctionIsEqualTo(
read_function,
r'"file://path/to/csv/file.csv"',
r"[], columns=['file://path/to/csv/file.csv']",
)
def testPandasReadJsonParsesInput(self):
read_function = "read_json"
# Normal input should be correctly handled
self._assertPandasRestrictedReadFunctionIsEqualTo(
read_function, '"[1, 2, 3]"', "[1, 2, 3]"
)
self._assertPandasRestrictedReadFunctionIsEqualTo(
read_function,
'\'{"column_name": [1, 2, 3], "another_column": [3, 9.2, 100]}\'',
'{"column_name": [1, 2, 3], "another_column": [3, 9.2, 100]}',
)
# Ensure monkey patch parses keyword arguments to patched function
self._assertPandasRestrictedReadFunctionIsEqualTo(
read_function,
r'"[1, 2, 3]\n[4, 5, 6]", lines=True',
"[[1, 2, 3], [4, 5, 6]]",
)
# URLs, etc. should raise a ValueError
# (see testPandasReadJsonProhibitsMalicousString)
def testPandasReadJsonProhibitsMalicousString(self):
"""
Test if file path, urls and other bad strings
raise value errors
"""
# Create valid json file which could be read
# by a non-patched read_json function.
test_file_path = ".testPandasReadJson.json"
json_test_data = [1, 2, 3]
with open(test_file_path, 'w') as json_file:
json.dump(json_test_data, json_file)
self.addCleanup(os.remove, test_file_path)
# Ensure json creation was successful
self.assertTrue(os.path.isfile(test_file_path))
with open(test_file_path, "r") as json_file:
self.assertEqual(json_test_data, json.loads(json_file.read()))
for malicous_input in (
# If pandas would read this as an URL it should
# raise an URLError. But because it will try
# to read it as a json string, it will raise
# a ValueError.
"https://test-url.com/test-name.json",
"file://path/to/json/file.json",
# This shouldn't raise any error in case
# pandas read function wouldn't be patched.
test_file_path,
# Gibberish should also raise a ValueError
"Invalid-string"
):
self.assertRaises(
ValueError,
self.createAndRunScript,
'''
import pandas as pd
pd.read_json("{}")
'''.format(malicous_input)
)
def test_suite():
suite = unittest.TestSuite()
......
......@@ -32,7 +32,7 @@
</tr>
<tr>
<td>storeText</td>
<td>transition_message</td>
<td>css=div.transition_message</td>
<td>erp5_rss_box_url</td>
</tr>
<tr>
......@@ -42,7 +42,7 @@
</tr>
<tr>
<td>storeText</td>
<td>transition_message</td>
<td>css=div.transition_message</td>
<td>erp5_rss_box_id</td>
</tr>
<span metal:use-macro="container/Zuite_CommonTemplate/macros/wait_for_activities">Wait for activities</span>
......
......@@ -33,7 +33,7 @@ Wait for activities</span>
</tr>
<tr>
<td>storeText</td>
<td>transition_message</td>
<td>css=div.transition_message</td>
<td>erp5_web_section_random_page_box_url</td>
</tr>
<tr>
......@@ -43,7 +43,7 @@ Wait for activities</span>
</tr>
<tr>
<td>storeText</td>
<td>transition_message</td>
<td>css=div.transition_message</td>
<td>erp5_web_section_random_page_box_id</td>
</tr>
......@@ -55,7 +55,7 @@ Wait for activities</span>
</tr>
<tr>
<td>storeText</td>
<td>transition_message</td>
<td>css=div.transition_message</td>
<td>erp5_persons_box_url</td>
</tr>
<tr>
......@@ -65,7 +65,7 @@ Wait for activities</span>
</tr>
<tr>
<td>storeText</td>
<td>transition_message</td>
<td>css=div.transition_message</td>
<td>erp5_persons_box_id</td>
</tr>
......
......@@ -62,12 +62,12 @@
</tr>
<tr>
<td>waitForText</td>
<td>transition_message</td>
<td>css=div.transition_message</td>
<td>regexp:knowledge_pad_module/.*</td>
</tr>
<tr>
<td>storeText</td>
<td>transition_message</td>
<td>css=div.transition_message</td>
<td>erp5_persons_url</td>
</tr>
<tr>
......@@ -77,7 +77,7 @@
</tr>
<tr>
<td>storeText</td>
<td>transition_message</td>
<td>css=div.transition_message</td>
<td>erp5_persons_id</td>
</tr>
<tr>
......
......@@ -36,7 +36,7 @@ Wait for activities</span>
</tr>
<tr>
<td>storeText</td>
<td>transition_message</td>
<td>css=div.transition_message</td>
<td>worklist_box_url</td>
</tr>
<tr>
......@@ -46,7 +46,7 @@ Wait for activities</span>
</tr>
<tr>
<td>storeText</td>
<td>transition_message</td>
<td>css=div.transition_message</td>
<td>worklist_box_id</td>
</tr>
......
......@@ -164,7 +164,7 @@ class Alarm(XMLObject, PeriodicityMixin):
activate_kw['tag'] = '%s_%x' % (self.getRelativeUrl(), getrandbits(32))
tag = activate_kw['tag']
method = getattr(self, method_id)
func_code = method.__code__
func_code = method.func_code
try:
has_kw = func_code.co_flags & CO_VARKEYWORDS
except AttributeError:
......
......@@ -324,12 +324,8 @@ class SimulationTool(BaseTool):
output_simulation_state = [output_simulation_state]
sql_kw['output_simulation_state'] = output_simulation_state
# XXX In this case, we must not set sql_kw[input_simumlation_state] before
input_simulation_state = None
output_simulation_state = None
if 'input_simulation_state' in sql_kw:
input_simulation_state = sql_kw.get('input_simulation_state')
if 'output_simulation_state' in sql_kw:
output_simulation_state = sql_kw.get('output_simulation_state')
input_simulation_state = sql_kw.get('input_simulation_state')
output_simulation_state = sql_kw.get('output_simulation_state')
if input_simulation_state is not None \
or output_simulation_state is not None:
sql_kw.pop('input_simulation_state',None)
......@@ -2190,12 +2186,8 @@ class SimulationTool(BaseTool):
new_kw['date_condition_in_join'] = not (new_kw.get('input') or new_kw.get('output'))
# Pass simulation state to request
if next_item_simulation_state:
new_kw['simulation_state_list'] = next_item_simulation_state
elif 'item.simulation_state' in kw:
new_kw['simulation_state_list'] = kw['item.simulation_state']
else:
new_kw['simulation_state_list'] = None
new_kw['simulation_state_list'] = next_item_simulation_state or \
kw.get('item.simulation_state')
return self.Resource_zGetTrackingList(src__=src__,
**new_kw)
......
......@@ -41,7 +41,7 @@ class Accessor(Method):
def __getinitargs__(self):
init = getattr(self, '__init__', None)
if init is not None:
varnames = init.__code__.co_varnames
varnames = init.func_code.co_varnames
args = []
for name in varnames:
if name == 'self':
......
......@@ -181,7 +181,7 @@ class Predicate(XMLObject):
try:
result = method(self)
except TypeError:
if method.__code__.co_argcount != isinstance(method, MethodType):
if method.func_code.co_argcount != isinstance(method, MethodType):
raise
# backward compatibilty with script that takes no argument
warn('Predicate %s uses an old-style method (%s) that does not'
......
......@@ -71,8 +71,8 @@ class InteractorMethod(Method):
self.after_action_list = []
self.before_action_list = []
self.method = method
self.__code__ = self.func_code = method.__code__
self.__defaults__ = self.func_defaults = method.__defaults__
self.func_code = method.func_code
self.func_defaults = method.func_defaults
self.__name__ = method.__name__
def registerBeforeAction(self, action, args, kw):
......
##############################################################################
#
# Copyright (c) 2012 Nexedi SARL and Contributors. All Rights Reserved.
# Levin Zimmermann <levin.zimmermann@nexedi.com>
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsability of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# garantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
"""
Restricted pandas module.
From restricted python, use "import pandas" (see patches/Restricted.py).
"""
from pandas import *
# Add restricted versions of IO functions
import six as _six
from AccessControl.ZopeGuards import Unauthorized as _ZopeGuardsUnauthorized
if _six.PY2:
from StringIO import StringIO as _StringIO
else:
from io import StringIO as _StringIO
def _addRestrictedPandasReadFunction(function_name):
original_function = getattr(__import__('pandas'), function_name)
def Pandas_read(data_string, *args, **kwargs):
# Strict: don't use 'isinstance', only allow buildin str
# objects
if type(data_string) is not str:
raise _ZopeGuardsUnauthorized(
"Parsing object '%s' of type '%s' is prohibited!" % (data_string, type(data_string))
)
string_io = _StringIO(data_string)
return original_function(string_io, *args, **kwargs)
disclaimer = """\n
Disclaimer:
This function has been patched by ERP5 for zope sandbox usage.
Only objects of type 'str' are valid inputs, file paths, files,
urls, etc. are prohibited or ignored.
"""
Pandas_read.__doc__ = original_function.__doc__ + disclaimer
globals().update({function_name: Pandas_read})
def _addRestrictedPandasReadFunctionTuple():
pandas_read_function_to_restrict_tuple = (
"read_json",
# "read_html", # needs installation of additional dependency: html5lib
"read_csv",
"read_fwf",
# "read_xml", # only available for pandas version >= 1.3.0
)
for pandas_read_function_to_restrict in pandas_read_function_to_restrict_tuple:
_addRestrictedPandasReadFunction(pandas_read_function_to_restrict)
_addRestrictedPandasReadFunctionTuple()
\ No newline at end of file
......@@ -234,7 +234,7 @@ def deprecated(message=''):
@simple_decorator
def _deprecated(wrapped):
m = message or "Use of '%s' function (%s, line %s) is deprecated." % (
wrapped.__name__, wrapped.__module__, wrapped.__code__.co_firstlineno)
wrapped.__name__, wrapped.__module__, wrapped.func_code.co_firstlineno)
def deprecated(*args, **kw):
warnings.warn(m, DeprecationWarning, 2)
return wrapped(*args, **kw)
......
......@@ -84,9 +84,7 @@ from Products.ERP5Type.patches import zopecontenttype
from Products.ERP5Type.patches import OFSImage
from Products.ERP5Type.patches import _transaction
from Products.ERP5Type.patches import default_zpublisher_encoding
if six.PY2:
# DCWorkflowGraph is dead since 2011, so no py3 version
from Products.ERP5Type.patches import DCWorkflowGraph
from Products.ERP5Type.patches import DCWorkflowGraph
from Products.ERP5Type.patches import SourceCodeEditorZMI
from Products.ERP5Type.patches import CachingPolicyManager
from Products.ERP5Type.patches import AcceleratedHTTPCacheManager
......
......@@ -28,167 +28,174 @@
#
##############################################################################
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Globals import InitializeClass
from Products.ERP5Type import Permissions
# Products.DCWorkflowGraph.config does not check the return value of
# getenv('PATH'). This fails if PATH is not defined which is the case when
# running ZEO with SlapOS for example. But, Products.DCWorkflowGraph.__init__
# imports Products.DCWorkflowGraph.config as a side-effect of importing
# getGraph, so the only solution is to create a Module which will hide the
# one from DCWorkflowGraph
from types import ModuleType
dc_workflow_config_module = ModuleType('Products.DCWorkflowGraph.config')
import sys
sys.modules['Products.DCWorkflowGraph.config'] = dc_workflow_config_module
# where is 'pot'?, add your path here
import os
DOT_EXE = 'dot'
bin_search_path = []
if os.name == 'nt':
DOT_EXE = 'dot.exe'
# patch from Joachim Bauch bauch@struktur.de
# on Windows, the path to the ATT Graphviz installation
# is read from the registry.
try:
import win32api, win32con
# make sure that "key" is defined in our except block
key = None
try:
key = win32api.RegOpenKeyEx(win32con.HKEY_LOCAL_MACHINE, r'SOFTWARE\ATT\Graphviz')
value, type = win32api.RegQueryValueEx(key, 'InstallPath')
bin_search_path = [os.path.join(str(value), 'bin')]
except:
if key: win32api.RegCloseKey(key)
# key doesn't exist
pass
except ImportError:
# win32 may be not installed...
pass
try:
import Products.DCWorkflowGraph
except ImportError:
pass
else:
# for posix systems
DOT_EXE = 'dot'
path = os.getenv("PATH")
if path is not None:
bin_search_path = path.split(":")
dc_workflow_config_module.bin_search_path = bin_search_path
dc_workflow_config_module.DOT_EXE = DOT_EXE
def getObjectTitle(obj, REQUEST=None):
"""
Get a state/transition title to be displayed in the graph.
Monkey-patched to support translation similar to what
Products.ERP5Type.Accessor.WorkflowState.TranslatedGetter does
"""
if REQUEST is not None:
only_ids = REQUEST.get('only_ids', False)
translate = REQUEST.get('translate', False)
else:
only_ids = False
translate = False
_id = obj.getId()
title = obj.title
if not title or only_ids:
title = _id
# BBB keep Products.DCWorkflowGraph patch for a while as it solves a security issue
from AccessControl import ClassSecurityInfo
from Products.ERP5Type.Globals import InitializeClass
from Products.ERP5Type import Permissions
# Products.DCWorkflowGraph.config does not check the return value of
# getenv('PATH'). This fails if PATH is not defined which is the case when
# running ZEO with SlapOS for example. But, Products.DCWorkflowGraph.__init__
# imports Products.DCWorkflowGraph.config as a side-effect of importing
# getGraph, so the only solution is to create a Module which will hide the
# one from DCWorkflowGraph
from types import ModuleType
dc_workflow_config_module = ModuleType('Products.DCWorkflowGraph.config')
import sys
sys.modules['Products.DCWorkflowGraph.config'] = dc_workflow_config_module
# where is 'pot'?, add your path here
import os
DOT_EXE = 'dot'
bin_search_path = []
if os.name == 'nt':
DOT_EXE = 'dot.exe'
# patch from Joachim Bauch bauch@struktur.de
# on Windows, the path to the ATT Graphviz installation
# is read from the registry.
try:
import win32api, win32con
# make sure that "key" is defined in our except block
key = None
try:
key = win32api.RegOpenKeyEx(win32con.HKEY_LOCAL_MACHINE, r'SOFTWARE\ATT\Graphviz')
value, type = win32api.RegQueryValueEx(key, 'InstallPath')
bin_search_path = [os.path.join(str(value), 'bin')]
except:
if key: win32api.RegCloseKey(key)
# key doesn't exist
pass
except ImportError:
# win32 may be not installed...
pass
else:
if translate:
# Translate the title in all supported Localizer languages
wf_id = obj.getWorkflow().id
localizer = obj.Localizer
original_title = title
for lang in localizer.get_supported_languages():
msg_id = '%s [state in %s]' % (title, wf_id)
translated_title = localizer.erp5_ui.gettext(
msg_id,
lang=lang,
# Fallback on non-workflow state translation
default=localizer.erp5_ui.gettext(original_title,
lang=lang,
default=None))
if (translated_title is not None and
translated_title != original_title):
title += "\\n%s" % translated_title
title += "\\n(%s)"% _id
return title
from Products.DCWorkflowGraph import DCWorkflowGraph
DCWorkflowGraph.getObjectTitle = getObjectTitle
from Products.DCWorkflowGraph.config import bin_search_path, DOT_EXE
from zLOG import LOG, WARNING
import subprocess
def getGraph(self, wf_id="", format="png", REQUEST=None):
"""show a workflow as a graph, copy from:
"OpenFlowEditor":http://www.openflow.it/wwwopenflow/Download/OpenFlowEditor_0_4.tgz
Monkey-patched to specify font name and size as 'dot' uses Times font by
default which does not support Japanese:
http://www.graphviz.org/doc/fontfaq.txt
Another solution would be to modify fontconfig configuration so that Times
match Japanese font or to use Unifont which supports many code points.
"""
try:
pot = self.getPOT(wf_id, REQUEST)
except TypeError:
# DCWorkflowGraph < 0.4
pot = self.getPOT(wf_id)
try:
encoding = self.portal_properties.site_properties.getProperty(
'default_charset', 'utf-8')
except AttributeError:
# no portal_properties or site_properties, fallback to:
encoding = self.management_page_charset.lower()
result = pot.encode(encoding)
if REQUEST is None:
REQUEST = self.REQUEST
setHeader = REQUEST.RESPONSE.setHeader
if format != 'dot':
p = subprocess.Popen((DCWorkflowGraph.bin_search(DOT_EXE),
'-Nfontname=IPAexGothic', '-Nfontsize=10',
'-Efontname=IPAexGothic', '-Efontsize=10',
'-T%s' % format),
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
result = p.communicate(result)[0]
setHeader('Content-Type', 'image/%s' % format)
else:
filename = wf_id or self.getId()
setHeader('Content-Type', 'text/x-graphviz')
setHeader('Content-Disposition', 'attachment; filename=%s.dot' % filename)
if not result:
LOG("ERP5Type.patches.DCWorkflowGraph", WARNING,
"Empty %s graph file" % format)
return result
DCWorkflowGraph.getGraph = getGraph
from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition
DCWorkflowDefinition.getGraph = getGraph
DCWorkflowDefinition.getPOT = DCWorkflowGraph.getPOT
security = ClassSecurityInfo()
security.declareProtected(Permissions.ManagePortal, 'getPOT')
security.declareProtected(Permissions.ManagePortal, 'getGraph')
DCWorkflowDefinition.security = security
InitializeClass(DCWorkflowDefinition)
# for posix systems
DOT_EXE = 'dot'
path = os.getenv("PATH")
if path is not None:
bin_search_path = path.split(":")
dc_workflow_config_module.bin_search_path = bin_search_path
dc_workflow_config_module.DOT_EXE = DOT_EXE
def getObjectTitle(obj, REQUEST=None):
"""
Get a state/transition title to be displayed in the graph.
Monkey-patched to support translation similar to what
Products.ERP5Type.Accessor.WorkflowState.TranslatedGetter does
"""
if REQUEST is not None:
only_ids = REQUEST.get('only_ids', False)
translate = REQUEST.get('translate', False)
else:
only_ids = False
translate = False
_id = obj.getId()
title = obj.title
if not title or only_ids:
title = _id
else:
if translate:
# Translate the title in all supported Localizer languages
wf_id = obj.getWorkflow().id
localizer = obj.Localizer
original_title = title
for lang in localizer.get_supported_languages():
msg_id = '%s [state in %s]' % (title, wf_id)
translated_title = localizer.erp5_ui.gettext(
msg_id,
lang=lang,
# Fallback on non-workflow state translation
default=localizer.erp5_ui.gettext(original_title,
lang=lang,
default=None))
if (translated_title is not None and
translated_title != original_title):
title += "\\n%s" % translated_title
title += "\\n(%s)"% _id
return title
from Products.DCWorkflowGraph import DCWorkflowGraph
DCWorkflowGraph.getObjectTitle = getObjectTitle
from Products.DCWorkflowGraph.config import bin_search_path, DOT_EXE
from zLOG import LOG, WARNING
import subprocess
def getGraph(self, wf_id="", format="png", REQUEST=None):
"""show a workflow as a graph, copy from:
"OpenFlowEditor":http://www.openflow.it/wwwopenflow/Download/OpenFlowEditor_0_4.tgz
Monkey-patched to fix command injection and specify font name and size as 'dot'
uses Times font by default which does not support Japanese:
http://www.graphviz.org/doc/fontfaq.txt
Another solution would be to modify fontconfig configuration so that Times
match Japanese font or to use Unifont which supports many code points - but we
don't care, this is obsolete code.
"""
try:
pot = self.getPOT(wf_id, REQUEST)
except TypeError:
# DCWorkflowGraph < 0.4
pot = self.getPOT(wf_id)
try:
encoding = self.portal_properties.site_properties.getProperty(
'default_charset', 'utf-8')
except AttributeError:
# no portal_properties or site_properties, fallback to:
encoding = self.management_page_charset.lower()
result = pot.encode(encoding)
if REQUEST is None:
REQUEST = self.REQUEST
setHeader = REQUEST.RESPONSE.setHeader
if format != 'dot':
p = subprocess.Popen((DCWorkflowGraph.bin_search(DOT_EXE),
'-Nfontname=IPAexGothic', '-Nfontsize=10',
'-Efontname=IPAexGothic', '-Efontsize=10',
'-T%s' % format),
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
result = p.communicate(result)[0]
setHeader('Content-Type', 'image/%s' % format)
else:
filename = wf_id or self.getId()
setHeader('Content-Type', 'text/x-graphviz')
setHeader('Content-Disposition', 'attachment; filename=%s.dot' % filename)
if not result:
LOG("ERP5Type.patches.DCWorkflowGraph", WARNING,
"Empty %s graph file" % format)
return result
DCWorkflowGraph.getGraph = getGraph
from Products.DCWorkflow.DCWorkflow import DCWorkflowDefinition
DCWorkflowDefinition.getGraph = getGraph
DCWorkflowDefinition.getPOT = DCWorkflowGraph.getPOT
security = ClassSecurityInfo()
security.declareProtected(Permissions.ManagePortal, 'getPOT')
security.declareProtected(Permissions.ManagePortal, 'getGraph')
DCWorkflowDefinition.security = security
InitializeClass(DCWorkflowDefinition)
......@@ -44,7 +44,7 @@ def volatileCached(self, func):
self._v_SimpleItem_Item_vCache = cache_dict = {}
# Use whole func_code as a key, as it is the only reliable way to identify a
# function.
key = func.__code__
key = func.func_code
try:
return cache_dict[key]
except KeyError:
......
......@@ -434,6 +434,7 @@ MNAME_MAP = {
'calendar': 'Products.ERP5Type.Calendar',
'collections': 'Products.ERP5Type.Collections',
'six': 'Products.ERP5Type.Six',
'pandas': 'Products.ERP5Type.Pandas',
}
for alias, real in six.iteritems(MNAME_MAP):
assert '.' not in alias, alias # TODO: support this
......@@ -532,23 +533,20 @@ def restrictedMethod(s,name):
raise Unauthorized(name)
return dummyMethod
try:
import pandas as pd
except ImportError:
pass
else:
allow_module('pandas')
allow_type(pd.Series)
allow_type(pd.Timestamp)
allow_type(pd.DatetimeIndex)
# XXX: pd.DataFrame has its own security thus disable
# until we can fully integrate it
#allow_type(pd.DataFrame)
allow_type(pd.MultiIndex)
allow_type(pd.indexes.range.RangeIndex)
allow_type(pd.indexes.numeric.Int64Index)
allow_type(pd.core.groupby.DataFrameGroupBy)
allow_type(pd.core.groupby.SeriesGroupBy)
allow_class(pd.DataFrame)
# Note: These black_list methods are for pandas 0.19.2
......@@ -557,10 +555,10 @@ else:
ContainerAssertions[pd.Series] = _check_access_wrapper(
pd.Series, dict.fromkeys(series_black_list, restrictedMethod))
pandas_black_list = ('read_csv', 'read_json', 'read_pickle', 'read_hdf',
'read_fwf', 'read_excel', 'read_html', 'read_msgpack',
pandas_black_list = ('read_pickle', 'read_hdf',
'read_excel', 'read_html', 'read_msgpack',
'read_gbq', 'read_sas', 'read_stata')
ModuleSecurityInfo('pandas').declarePrivate(*pandas_black_list)
ModuleSecurityInfo(MNAME_MAP['pandas']).declarePrivate(*pandas_black_list)
dataframe_black_list = ('to_csv', 'to_json', 'to_pickle', 'to_hdf',
'to_excel', 'to_html', 'to_sql', 'to_msgpack',
......
# -*- coding: utf-8 -*-
##############################################################################
#
# Copyright (c) 2012 Nexedi SA and Contributors. All Rights Reserved.
#
# WARNING: This program as such is intended to be used by professional
# programmers who take the whole responsibility of assessing all potential
# consequences resulting from its eventual inadequacies and bugs
# End users who are looking for a ready-to-use solution with commercial
# guarantees and support are strongly adviced to contract a Free Software
# Service Company
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#
##############################################################################
import os
import glob
import unittest
from Products.ERP5Type.tests.ERP5TypeTestCase import ERP5TypeTestCase
from Products.ERP5.Document.BusinessTemplate import BusinessTemplateMissingDependency
class ReExportBusinessTemplateTest(ERP5TypeTestCase):
"""Reuse test infrastructure to rebuild and export business template.
The business template to export is defined by RE_EXPORT_BUSINESS_TEMPLATE
ReExportERP5BusinessTemplateTestSuite in test/__init__.py can be used
to rebuild all business templates.
"""
def getBusinessTemplateList(self):
return ( self.re_export_business_template, )
def _getBusinessTemplatePathList(self):
from Products.ERP5.ERP5Site import getBootstrapDirectory
bt5_path_list = [
os.environ.get('erp5_tests_bootstrap_path') or getBootstrapDirectory()
]
for path in os.environ['erp5_tests_bt5_path'].split(','):
if os.path.exists(os.path.join(path, "bt5list")):
bt5_path_list.append(path)
for path in glob.glob(os.path.join(path, "*", "bt5list")):
bt5_path_list.append(os.path.dirname(path))
return bt5_path_list
def _installBusinessTemplateList(
self, bt_list, update_repository_bt_list=True, *args, **kwargs):
"""Install dependencies automatically and also install erp5_forge,
which is needed for VCS integration
"""
template_tool = self.portal.portal_templates
from Products.ERP5.ERP5Site import getBootstrapDirectory
bt5_path_list = [os.environ.get('erp5_tests_bootstrap_path') or
getBootstrapDirectory()]
for path in os.environ['erp5_tests_bt5_path'].split(','):
if os.path.exists(os.path.join(path, "bt5list")):
bt5_path_list.append(path)
for path in glob.glob(os.path.join(path, "*", "bt5list")):
bt5_path_list.append(os.path.dirname(path))
template_tool.updateRepositoryBusinessTemplateList(bt5_path_list)
bt_to_install_title_set = set(x[1] for x in bt_list)
bt_to_install_title_set.add('erp5_core')
# Install the business template to rebuild.
try:
url_bt_tuple_list = [
('%s/%s' % (repository, bt_title), bt_title) for repository, bt_title in
template_tool.resolveBusinessTemplateListDependency(
bt_to_install_title_set,
with_test_dependency_list=True)]
except BusinessTemplateMissingDependency as e:
# it may have a virtual dependency on erp5_full_text_catalog, if that's
# the case, we choose erp5_full_text_mroonga_catalog
if str(e).startswith('Unable to resolve dependencies for erp5_full_text_catalog,'):
url_bt_tuple_list = [
('%s/%s' % (repository, bt_title), bt_title) for repository, bt_title in
template_tool.resolveBusinessTemplateListDependency(
bt_to_install_title_set | set(('erp5_full_text_mroonga_catalog',)),
with_test_dependency_list=True)]
if 'erp5_forge' not in bt_to_install_title_set:
try:
upgrader_url_bt_tuple_list = [
('%s/%s' % (repository, bt_title), bt_title) for repository, bt_title in
template_tool.resolveBusinessTemplateListDependency(
bt_to_install_title_set | set(('erp5_forge',)),
# We don't actually run erp5_forge test, so we don't want to install
# erp5_forge test dependencies
with_test_dependency_list=False)]
except BusinessTemplateMissingDependency as e:
if str(e).startswith('Unable to resolve dependencies for erp5_full_text_catalog,'):
upgrader_url_bt_tuple_list = [
('%s/%s' % (repository, bt_title), bt_title) for repository, bt_title in
template_tool.resolveBusinessTemplateListDependency(
bt_to_install_title_set | set(('erp5_forge', 'erp5_full_text_mroonga_catalog',)),
with_test_dependency_list=False)]
for url, bt in upgrader_url_bt_tuple_list:
if bt not in bt_to_install_title_set:
url_bt_tuple_list.append((url, bt))
return super(ReExportBusinessTemplateTest,
self)._installBusinessTemplateList(
url_bt_tuple_list, *args, **kwargs)
def test_re_export_business_template(self):
template_tool = self.portal.portal_templates
pref = self.portal.portal_preferences.newContent(
portal_type='System Preference')
pref.setPreferredWorkingCopyList(self._getBusinessTemplatePathList())
pref.enable()
self.tic()
bt = template_tool.getInstalledBusinessTemplate(
self.re_export_business_template, strict=True)
bt.build()
getattr(bt, 'tree.xml')()
# TODO: do the actual commit from here ? for now we leave the changes in the
# working copy and developer will review and commit the changes.
def test_suite():
suite = unittest.TestSuite()
re_export_business_template = os.environ['RE_EXPORT_BUSINESS_TEMPLATE']
testclass = type(
'ReExportBusinessTemplate %s' % re_export_business_template,
(ReExportBusinessTemplateTest, ),
{
're_export_business_template': re_export_business_template,
},
)
suite.addTest(unittest.makeSuite(testclass))
return suite
......@@ -1452,7 +1452,7 @@ class Catalog(Folder,
if meta_type in self.HAS_ARGUMENT_SRC_METATYPE_SET:
return method.arguments_src.split()
elif meta_type in self.HAS_FUNC_CODE_METATYPE_SET:
return method.__code__.co_varnames[:method.__code__.co_argcount]
return method.func_code.co_varnames[:method.func_code.co_argcount]
# Note: Raising here would completely prevent indexation from working.
# Instead, let the method actually fail when called, so _catalogObjectList
# can log the error and carry on.
......@@ -1838,7 +1838,7 @@ class Catalog(Folder,
else:
search_key = self.getSearchKey(key, 'RelatedKey')
else:
func_code = script.__code__
func_code = script.func_code
search_key = (
AdvancedSearchKeyWrapperForScriptableKey if (
# 5: search_value (under any name), "search_key", "group",
......
......@@ -260,6 +260,24 @@ class ERP5BusinessTemplateCodingStyleTestSuite(_ERP5):
return log_directory
class ReExportERP5BusinessTemplateTestSuite(ERP5TypeTestSuite):
def getTestList(self):
return sorted([
os.path.basename(path)
for path in chain(
glob(HERE + '/../bt5/*'),
glob(HERE + '/../product/ERP5/bootstrap/*'))
if not os.path.exists(path + '/bt/skip_coding_style_test') and os.path.isdir(path)
])
def run(self, full_test):
return self.runUnitTest(
'--portal_id=erp5',
'ReExportBusinessTemplate',
RE_EXPORT_BUSINESS_TEMPLATE=full_test)
class RJS_Only(_ERP5):
def getTestList(self):
rjs_officejs_bt_list = ["erp5_officejs_",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment