Commit d1c9a710 authored by Chris McDonough's avatar Chris McDonough

Created 'configure' API function. The 'configure' API function accepts a config

file path and does for scripts what "start_zope" does for interactive startup.

Use the configure function from within zopectl, replacing some gnarly exec'd code.
parent 969753a7
......@@ -12,16 +12,38 @@
#
##############################################################################
# this function will actually start up a Zope instance
def run():
import App.config
from Zope.Startup import handlers, options, start_zope
""" Start a Zope instance """
from Zope.Startup import start_zope
opts = _setconfig()
start_zope(opts.configroot)
def configure(configfile):
""" Provide an API which allows scripts like zopectl to configure
Zope before attempting to do 'app = Zope.app(). Should be used as
follows: from Zope.Startup.run import configure;
configure('/path/to/configfile'); import Zope; app = Zope.app() """
from Zope.Startup import dropPrivileges
opts = _setconfig(configfile)
dropPrivileges(opts.configroot)
def _setconfig(configfile=None):
""" Configure a Zope instance based on ZopeOptions. Optionally
accept a configfile argument (string path) in order to specify
where the configuration file exists. """
from Zope.Startup import options, handlers
from App import config
opts = options.ZopeOptions()
if configfile:
opts.configfile=configfile
opts.realize(doc="Sorry, no option docs yet.", raise_getopt_errs=0)
else:
opts.realize(doc="Sorry, no option docs yet.")
handlers.handleConfig(opts.configroot, opts.confighandlers)
import App.config
App.config.setConfiguration(opts.configroot)
start_zope(opts.configroot)
return opts
if __name__ == '__main__':
run()
......
......@@ -15,16 +15,15 @@
import os
import cStringIO
import logging
import tempfile
import unittest
import ZConfig
import Zope.Startup
from App.config import getConfiguration
from Zope.Startup import ZopeStarter
from App.config import getConfiguration, setConfiguration
import logging
TEMPNAME = tempfile.mktemp()
TEMPPRODUCTS = os.path.join(TEMPNAME, "Products")
......@@ -40,18 +39,15 @@ def getSchema():
logger_states = {}
for name in ('event', 'trace', 'access'):
logger = logging.getLogger(name)
logger_states[name] = {'level': logger.level,
'propagate': logger.propagate,
'handlers': logger.handlers,
'filters': logger.filters}
logger_states[name] = {'level':logger.level,
'propagate':logger.propagate,
'handlers':logger.handlers,
'filters':logger.filters}
class ZopeStarterTestCase(unittest.TestCase):
schema = None
def setUp(self):
if self.schema is None:
ZopeStarterTestCase.schema = getSchema()
self.schema = getSchema()
self.original_event_logger = logging.getLogger
def tearDown(self):
......@@ -70,7 +66,6 @@ class ZopeStarterTestCase(unittest.TestCase):
# of the directory is checked. This handles this in a
# platform-independent way.
schema = self.schema
text = "instancehome <<INSTANCE_HOME>>\n" + text
sio = cStringIO.StringIO(
text.replace("<<INSTANCE_HOME>>", TEMPNAME))
try:
......@@ -89,7 +84,9 @@ class ZopeStarterTestCase(unittest.TestCase):
import locale
try:
try:
conf = self.load_config_text("locale en_GB")
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
locale en_GB""")
except ZConfig.DataConversionError, e:
# Skip this test if we don't have support.
if e.message.startswith(
......@@ -107,6 +104,7 @@ class ZopeStarterTestCase(unittest.TestCase):
import zLOG
import sys
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
debug-mode on
<eventlog>
level info
......@@ -135,6 +133,7 @@ class ZopeStarterTestCase(unittest.TestCase):
self.assertEqual(len(zLOG.EventLogger.EventLogger.logger.handlers), 1)
self.failUnlessEqual(starter.startup_handler.stream, sys.stderr)
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
debug-mode off
<eventlog>
level info
......@@ -148,7 +147,9 @@ class ZopeStarterTestCase(unittest.TestCase):
self.failIfEqual(starter.startup_handler.stream, sys.stderr)
def testSetupZServerThreads(self):
conf = self.load_config_text("zserver-threads 10")
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
zserver-threads 10""")
starter = ZopeStarter(conf)
starter.setupZServerThreads()
from ZServer.PubCore import _n
......@@ -156,6 +157,7 @@ class ZopeStarterTestCase(unittest.TestCase):
def testSetupServers(self):
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
<http-server>
address 18092
</http-server>
......@@ -175,24 +177,31 @@ class ZopeStarterTestCase(unittest.TestCase):
ZServer.FTPServer)
finally:
del conf.servers # should release servers
pass
def testSetupServersWithConflict(self):
conf = self.load_config_text("""
<http-server>
address 18092
</http-server>
<ftp-server>
# conflict
address 18092
</ftp-server>""")
starter = ZopeStarter(conf)
# do the job the 'handler' would have done (call prepare)
for server in conf.servers:
server.prepare('', None, 'Zope', {}, None)
try:
self.assertRaises(ZConfig.ConfigurationError, starter.setupServers)
finally:
del conf.servers
# The rest sets up a conflict by using the same port for the HTTP
# and FTP servers, relying on socket.bind() to raise an "address
# already in use" exception. However, because the sockets specify
# SO_REUSEADDR, socket.bind() may not raise that exception.
# See <http://zope.org/Collectors/Zope/1104> for gory details.
## conf = self.load_config_text("""
## instancehome <<INSTANCE_HOME>>
## <http-server>
## address 18092
## </http-server>
## <ftp-server>
## # conflict
## address 18092
## </ftp-server>""")
## starter = ZopeStarter(conf)
## # do the job the 'handler' would have done (call prepare)
## for server in conf.servers:
## server.prepare('', None, 'Zope', {}, None)
## try:
## self.assertRaises(ZConfig.ConfigurationError, starter.setupServers)
## finally:
## del conf.servers
def testDropPrivileges(self):
# somewhat incomplete because we we're never running as root
......@@ -205,23 +214,30 @@ class ZopeStarterTestCase(unittest.TestCase):
try:
os.getuid = _return0
# no effective user
conf = self.load_config_text("")
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>""")
starter = ZopeStarter(conf)
self.assertRaises(ZConfig.ConfigurationError,
starter.dropPrivileges)
# cant find user in passwd database
conf = self.load_config_text("effective-user n0sucHuS3r")
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
effective-user n0sucHuS3r""")
starter = ZopeStarter(conf)
self.assertRaises(ZConfig.ConfigurationError,
starter.dropPrivileges)
# can't specify '0' as effective user
conf = self.load_config_text("effective-user 0")
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
effective-user 0""")
starter = ZopeStarter(conf)
self.assertRaises(ZConfig.ConfigurationError,
starter.dropPrivileges)
# setuid to test runner's uid XXX will this work cross-platform?
runnerid = _old_getuid()
conf = self.load_config_text("effective-user %s" % runnerid)
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
effective-user %s""" % runnerid)
starter = ZopeStarter(conf)
finished = starter.dropPrivileges()
self.failUnless(finished)
......@@ -233,6 +249,7 @@ class ZopeStarterTestCase(unittest.TestCase):
import logging
import sys
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
debug-mode off
<eventlog>
level info
......@@ -277,14 +294,27 @@ class ZopeStarterTestCase(unittest.TestCase):
def testMakeLockFile(self):
# put something in the way (it should be deleted)
name = os.path.join(TEMPNAME, 'lock')
conf = self.load_config_text("lock-filename %s" % name)
f = open(name, 'a')
f.write('hello')
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
lock-filename %s""" % name
)
f = open(name, 'ab')
# On Windows, the first byte of the file is locked solid, and even
# we (this process) can't read from it via a file object other
# than the one passed to lock_file. So we put a blank
# in the test value first, so we can skip over it later. Also,
# because .seek(1) isn't well-defined for files opened in text
# mode, we open the file in binary mode (above and below).
f.write(' hello')
f.close()
try:
starter = ZopeStarter(conf)
starter.makeLockFile()
self.failIf(open(name).read().find('hello') > -1)
f = open(name, 'rb')
f.seek(1) # skip over the locked byte
guts = f.read()
f.close()
self.failIf(guts.find('hello') > -1)
finally:
starter.unlinkLockFile()
self.failIf(os.path.exists(name))
......@@ -292,7 +322,10 @@ class ZopeStarterTestCase(unittest.TestCase):
def testMakePidFile(self):
# put something in the way (it should be deleted)
name = os.path.join(TEMPNAME, 'pid')
conf = self.load_config_text("pid-filename %s" % name)
conf = self.load_config_text("""
instancehome <<INSTANCE_HOME>>
pid-filename %s""" % name
)
f = open(name, 'a')
f.write('hello')
f.close()
......@@ -304,8 +337,35 @@ class ZopeStarterTestCase(unittest.TestCase):
starter.unlinkPidFile()
self.failIf(os.path.exists(name))
def testZopeRunConfigure(self):
old_config = getConfiguration()
try:
os.mkdir(TEMPNAME)
os.mkdir(TEMPPRODUCTS)
except OSError, why:
if why == 17:
# already exists
pass
try:
fname = os.path.join(TEMPNAME, 'zope.conf')
from Zope import configure
f = open(fname, 'w')
f.write('instancehome %s\nzserver-threads 100\n' % TEMPNAME)
f.flush()
f.close()
configure(fname)
new_config = getConfiguration()
self.failUnlessEqual(new_config.zserver_threads, 100)
finally:
try:
os.unlink(fname)
except:
pass
setConfiguration(old_config)
def test_suite():
return unittest.makeSuite(ZopeStarterTestCase)
if __name__ == "__main__":
unittest.main(defaultTest="test_suite")
......@@ -134,16 +134,8 @@ class ZopeCmd(ZDCmd):
ZDCmd.do_start(self, arg)
def get_startup_cmd(self, python, more):
cmdline = ( '%s -c "from Zope.Startup.options import ZopeOptions; '
'from Zope.Startup import handlers as h; '
'from App import config; '
'opts=ZopeOptions(); '
'opts.configfile=\'%s\'; '
'opts.realize(); '
'h.handleConfig(opts.configroot,opts.confighandlers);'
'config.setConfiguration(opts.configroot); '
'from Zope.Startup import dropPrivileges; '
'dropPrivileges(opts.configroot); '%
cmdline = ( '%s -c "from Zope import configure;'
'configure(\'%s\');' %
(python, self.options.configfile)
)
return cmdline + more + '\"'
......
......@@ -56,6 +56,7 @@ def debug(*args, **kw):
import ZPublisher
return ZPublisher.test('Zope', *args, **kw)
from Zope.Startup.run import configure
# Zope.App.startup.startup() sets the following variables in this module.
DB = None
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment