Commit 14aec927 authored by Toby Dickenson's avatar Toby Dickenson

clean shutdown

parent 7b8a63c4
...@@ -609,6 +609,11 @@ class FTPServer(ftp_server): ...@@ -609,6 +609,11 @@ class FTPServer(ftp_server):
self.port self.port
)) ))
def clean_shutdown_control(self,phase,time_in_this_phase):
if phase==2:
self.log_info('closing FTP to new connections')
self.close()
def log_info(self, message, type='info'): def log_info(self, message, type='info'):
if self.shutup: return if self.shutup: return
asyncore.dispatcher.log_info(self, message, type) asyncore.dispatcher.log_info(self, message, type)
......
...@@ -281,7 +281,8 @@ class zhttp_handler: ...@@ -281,7 +281,8 @@ class zhttp_handler:
class zhttp_channel(http_channel): class zhttp_channel(http_channel):
"http channel" "http channel"
closed=0 closed = 0
no_more_requests = 0
zombie_timeout=100*60 # 100 minutes zombie_timeout=100*60 # 100 minutes
max_header_len = 8196 max_header_len = 8196
...@@ -302,10 +303,23 @@ class zhttp_channel(http_channel): ...@@ -302,10 +303,23 @@ class zhttp_channel(http_channel):
push_with_producer=push push_with_producer=push
def clean_shutdown_control(self,phase,time_in_this_phase):
if phase==3:
# This is the shutdown phase where we are trying to finish processing
# outstanding requests, and not accept any more
self.no_more_requests = 1
if self.working or self.writable():
# We are busy working on an old request. Try to stall shutdown
return 1
else:
# We are no longer busy. Close ourself and allow shutdown to proceed
self.close()
return 0
def work(self): def work(self):
"try to handle a request" "try to handle a request"
if not self.working: if not self.working:
if self.queue: if self.queue and not self.no_more_requests:
self.working=1 self.working=1
try: module_name, request, response=self.queue.pop(0) try: module_name, request, response=self.queue.pop(0)
except: return except: return
...@@ -368,6 +382,11 @@ class zhttp_server(http_server): ...@@ -368,6 +382,11 @@ class zhttp_server(http_server):
self.server_port self.server_port
)) ))
def clean_shutdown_control(self,phase,time_in_this_phase):
if phase==2:
self.log_info('closing HTTP to new connections')
self.close()
def log_info(self, message, type='info'): def log_info(self, message, type='info'):
if self.shutup: return if self.shutup: return
dispatcher.log_info(self, message, type) dispatcher.log_info(self, message, type)
......
...@@ -33,6 +33,7 @@ ICP_OP_DENIED = 22 ...@@ -33,6 +33,7 @@ ICP_OP_DENIED = 22
class BaseICPServer(asyncore.dispatcher): class BaseICPServer(asyncore.dispatcher):
REQUESTS_PER_LOOP = 4 REQUESTS_PER_LOOP = 4
_shutdown = 0
def __init__ (self,ip,port): def __init__ (self,ip,port):
asyncore.dispatcher.__init__(self) asyncore.dispatcher.__init__(self)
...@@ -45,6 +46,21 @@ class BaseICPServer(asyncore.dispatcher): ...@@ -45,6 +46,21 @@ class BaseICPServer(asyncore.dispatcher):
addr = ip addr = ip
self.log_info('ICP server started\n\tAddress: %s\n\tPort: %s' % (addr,port) ) self.log_info('ICP server started\n\tAddress: %s\n\tPort: %s' % (addr,port) )
def clean_shutdown_control(self,phase,time_in_this_phase):
if phase==1:
# Stop responding to requests.
if not self._shutdown:
self._shutdown = 1
self.log_info('shutting down ICP')
if time_in_this_phase<2.0:
# We have not yet been deaf long enough for our front end proxies to notice.
# Do not allow shutdown to proceed yet
return 1
else:
# Shutdown can proceed. We dont need a socket any more
self.close()
return 0
def handle_read(self): def handle_read(self):
for i in range(self.REQUESTS_PER_LOOP): for i in range(self.REQUESTS_PER_LOOP):
try: try:
...@@ -61,7 +77,7 @@ class BaseICPServer(asyncore.dispatcher): ...@@ -61,7 +77,7 @@ class BaseICPServer(asyncore.dispatcher):
self.socket.sendto(reply,whence) self.socket.sendto(reply,whence)
def readable(self): def readable(self):
return 1 return not self._shutdown
def writable(self): def writable(self):
return 0 return 0
......
...@@ -8,6 +8,11 @@ Zope Changes ...@@ -8,6 +8,11 @@ Zope Changes
Features Added Features Added
- Zope's shutdown process is now more careful and clean. It
avoids sending half-complete responses to clients, particularly
when running in a cluster using ICP.
http://dev.zope.org/Wikis/DevSite/Proposals/CleanShutdown
- ZTUtils.Iterator now handles any object that has a Python - ZTUtils.Iterator now handles any object that has a Python
iterator, resolving Collector #385 and #577. The iterator, resolving Collector #385 and #577. The
implementation is somwhat simpler, as well. implementation is somwhat simpler, as well.
......
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
# #
############################################################################## ##############################################################################
__doc__="""System management components""" __doc__="""System management components"""
__version__='$Revision: 1.81 $'[11:-2] __version__='$Revision: 1.82 $'[11:-2]
import sys,os,time,Globals, Acquisition, os, Undo import sys,os,time,Globals, Acquisition, os, Undo
...@@ -28,6 +28,7 @@ from version_txt import version_txt ...@@ -28,6 +28,7 @@ from version_txt import version_txt
from cStringIO import StringIO from cStringIO import StringIO
from AccessControl import getSecurityManager from AccessControl import getSecurityManager
import zLOG import zLOG
import Lifetime
try: import thread try: import thread
except: get_ident=lambda: 0 except: get_ident=lambda: 0
...@@ -326,12 +327,12 @@ class ApplicationManager(Folder,CacheManager): ...@@ -326,12 +327,12 @@ class ApplicationManager(Folder,CacheManager):
zLOG.LOG("ApplicationManager", zLOG.INFO, zLOG.LOG("ApplicationManager", zLOG.INFO,
"Restart requested by %s" % user) "Restart requested by %s" % user)
for db in Globals.opened: db.close() for db in Globals.opened: db.close()
raise SystemExit, """<html> Lifetime.shutdown(1)
return """<html>
<head><meta HTTP-EQUIV=REFRESH CONTENT="5; URL=%s/manage_main"> <head><meta HTTP-EQUIV=REFRESH CONTENT="5; URL=%s/manage_main">
</head> </head>
<body>Zope is restarting</body></html> <body>Zope is restarting</body></html>
""" % URL1 """ % URL1
sys.exit(1)
def manage_shutdown(self): def manage_shutdown(self):
"""Shut down the application""" """Shut down the application"""
...@@ -341,8 +342,13 @@ class ApplicationManager(Folder,CacheManager): ...@@ -341,8 +342,13 @@ class ApplicationManager(Folder,CacheManager):
user = 'unknown user' user = 'unknown user'
zLOG.LOG("ApplicationManager", zLOG.INFO, zLOG.LOG("ApplicationManager", zLOG.INFO,
"Shutdown requested by %s" % user) "Shutdown requested by %s" % user)
for db in Globals.opened: db.close() #for db in Globals.opened: db.close()
sys.exit(0) Lifetime.shutdown(0)
return """<html>
<head>
</head>
<body>Zope is shutting down</body></html>
"""
def manage_pack(self, days=0, REQUEST=None): def manage_pack(self, days=0, REQUEST=None):
"""Pack the database""" """Pack the database"""
......
import sys, asyncore, time
_shutdown_phase = 0
_shutdown_timeout = 30 # seconds per phase
# The shutdown phase counts up from 0 to 4.
#
# 0 Not yet terminating. running in main loop
#
# 1 Loss of service is imminent. Prepare any front-end proxies for this happening
# by stopping any ICP servers, so that they can choose to send requests to other
# Zope servers in the cluster.
#
# 2 Stop accepting any new requests.
#
# 3 Wait for all old requests to have been processed
#
# 4 Already terminated
#
# It is up to individual socket handlers to implement these actions, by providing the
# 'clean_shutdown_control' method. This is called intermittantly during shutdown with
# two parameters; the current phase number, and the amount of time that it has currently
# been in that phase. This method should return true if it does not yet want shutdown to
# proceed to the next phase.
def shutdown(exit_code,fast = 0):
global _shutdown_phase
global _shutdown_timeout
if _shutdown_phase == 0:
# Thread safety? proably no need to care
sys.ZServerExitCode = exit_code
_shutdown_phase = 1
if fast:
# Someone wants us to shutdown fast. This is hooked into SIGTERM - so possibly
# the system is going down and we can expect a SIGKILL within a few seconds.
# Limit each shutdown phase to one second. This is fast enough, but still clean.
_shutdown_timeout = 1.0
def loop():
# Run the main loop until someone calls shutdown()
lifetime_loop()
# Gradually close sockets in the right order, while running a select
# loop to allow remaining requests to trickle away.
graceful_shutdown_loop()
def lifetime_loop():
# The main loop. Stay in here until we need to shutdown
map = asyncore.socket_map
timeout = 30.0
while map and _shutdown_phase == 0:
asyncore.poll(timeout, map)
def graceful_shutdown_loop():
# The shutdown loop. Allow various services to shutdown gradually.
global _shutdown_phase
timestamp = time.time()
timeout = 1.0
map = asyncore.socket_map
while map and _shutdown_phase < 4:
time_in_this_phase = time.time()-timestamp
veto = 0
for fd,obj in map.items():
try:
fn = getattr(obj,'clean_shutdown_control')
except AttributeError:
pass
else:
try:
veto = veto or fn(_shutdown_phase,time_in_this_phase)
except:
obj.handle_error()
if veto and time_in_this_phase<_shutdown_timeout:
# Any open socket handler can veto moving on to the next shutdown phase.
# (but not forever)
asyncore.poll(timeout, map)
else:
# No vetos? That is one step closer to shutting down
_shutdown_phase += 1
timestamp = time.time()
\ No newline at end of file
...@@ -609,6 +609,11 @@ class FTPServer(ftp_server): ...@@ -609,6 +609,11 @@ class FTPServer(ftp_server):
self.port self.port
)) ))
def clean_shutdown_control(self,phase,time_in_this_phase):
if phase==2:
self.log_info('closing FTP to new connections')
self.close()
def log_info(self, message, type='info'): def log_info(self, message, type='info'):
if self.shutup: return if self.shutup: return
asyncore.dispatcher.log_info(self, message, type) asyncore.dispatcher.log_info(self, message, type)
......
...@@ -281,7 +281,8 @@ class zhttp_handler: ...@@ -281,7 +281,8 @@ class zhttp_handler:
class zhttp_channel(http_channel): class zhttp_channel(http_channel):
"http channel" "http channel"
closed=0 closed = 0
no_more_requests = 0
zombie_timeout=100*60 # 100 minutes zombie_timeout=100*60 # 100 minutes
max_header_len = 8196 max_header_len = 8196
...@@ -302,10 +303,23 @@ class zhttp_channel(http_channel): ...@@ -302,10 +303,23 @@ class zhttp_channel(http_channel):
push_with_producer=push push_with_producer=push
def clean_shutdown_control(self,phase,time_in_this_phase):
if phase==3:
# This is the shutdown phase where we are trying to finish processing
# outstanding requests, and not accept any more
self.no_more_requests = 1
if self.working or self.writable():
# We are busy working on an old request. Try to stall shutdown
return 1
else:
# We are no longer busy. Close ourself and allow shutdown to proceed
self.close()
return 0
def work(self): def work(self):
"try to handle a request" "try to handle a request"
if not self.working: if not self.working:
if self.queue: if self.queue and not self.no_more_requests:
self.working=1 self.working=1
try: module_name, request, response=self.queue.pop(0) try: module_name, request, response=self.queue.pop(0)
except: return except: return
...@@ -368,6 +382,11 @@ class zhttp_server(http_server): ...@@ -368,6 +382,11 @@ class zhttp_server(http_server):
self.server_port self.server_port
)) ))
def clean_shutdown_control(self,phase,time_in_this_phase):
if phase==2:
self.log_info('closing HTTP to new connections')
self.close()
def log_info(self, message, type='info'): def log_info(self, message, type='info'):
if self.shutup: return if self.shutup: return
dispatcher.log_info(self, message, type) dispatcher.log_info(self, message, type)
......
...@@ -33,6 +33,7 @@ ICP_OP_DENIED = 22 ...@@ -33,6 +33,7 @@ ICP_OP_DENIED = 22
class BaseICPServer(asyncore.dispatcher): class BaseICPServer(asyncore.dispatcher):
REQUESTS_PER_LOOP = 4 REQUESTS_PER_LOOP = 4
_shutdown = 0
def __init__ (self,ip,port): def __init__ (self,ip,port):
asyncore.dispatcher.__init__(self) asyncore.dispatcher.__init__(self)
...@@ -45,6 +46,21 @@ class BaseICPServer(asyncore.dispatcher): ...@@ -45,6 +46,21 @@ class BaseICPServer(asyncore.dispatcher):
addr = ip addr = ip
self.log_info('ICP server started\n\tAddress: %s\n\tPort: %s' % (addr,port) ) self.log_info('ICP server started\n\tAddress: %s\n\tPort: %s' % (addr,port) )
def clean_shutdown_control(self,phase,time_in_this_phase):
if phase==1:
# Stop responding to requests.
if not self._shutdown:
self._shutdown = 1
self.log_info('shutting down ICP')
if time_in_this_phase<2.0:
# We have not yet been deaf long enough for our front end proxies to notice.
# Do not allow shutdown to proceed yet
return 1
else:
# Shutdown can proceed. We dont need a socket any more
self.close()
return 0
def handle_read(self): def handle_read(self):
for i in range(self.REQUESTS_PER_LOOP): for i in range(self.REQUESTS_PER_LOOP):
try: try:
...@@ -61,7 +77,7 @@ class BaseICPServer(asyncore.dispatcher): ...@@ -61,7 +77,7 @@ class BaseICPServer(asyncore.dispatcher):
self.socket.sendto(reply,whence) self.socket.sendto(reply,whence)
def readable(self): def readable(self):
return 1 return not self._shutdown
def writable(self): def writable(self):
return 0 return 0
......
...@@ -906,5 +906,8 @@ except: ...@@ -906,5 +906,8 @@ except:
# Start Medusa, Ye Hass! # Start Medusa, Ye Hass!
sys.ZServerExitCode=0 sys.ZServerExitCode=0
asyncore.loop() import Lifetime
sys.exit(sys.ZServerExitCode) Lifetime.loop()
code = sys.ZServerExitCode
zLOG.LOG("z2", zLOG.INFO, 'Exiting with code %d' % code )
sys.exit(code)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment