Commit 57afe6a8 authored by Jeremy Hylton's avatar Jeremy Hylton

Merge ZEO2-branch to trunk. (File removal.)

parent 81f586c4
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Facility for (roughly) atomically invalidating cache entries.
Note that this is not *really* atomic, but it is close enough.
"""
import cPickle
import tempfile
class Invalidator:
_d=None
def __init__(self, dinvalidate, cinvalidate):
self.dinvalidate=dinvalidate
self.cinvalidate=cinvalidate
def close(self):
self.dinvalidate = None
self.cinvalidate = None
def begin(self):
self._tfile=tempfile.TemporaryFile()
pickler=cPickle.Pickler(self._tfile, 1)
pickler.fast=1 # Don't use the memo
self._d=pickler.dump
def invalidate(self, args):
if self._d is None: return
self._d(args)
def end(self):
if self._d is None: return
self._d((0,0))
self._d=None
self._tfile.seek(0)
load=cPickle.Unpickler(self._tfile).load
self._tfile=None
cinvalidate=self.cinvalidate
dinvalidate=self.dinvalidate
while 1:
oid, version = load()
if not oid: break
cinvalidate(oid, version=version)
dinvalidate(oid, version=version)
def Invalidate(self, args):
cinvalidate=self.cinvalidate
dinvalidate=self.dinvalidate
for oid, version in args:
cinvalidate(oid, version=version)
dinvalidate(oid, version=version)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A wrapper for asyncore that provides robust exception handling.
The poll() and loop() calls exported by asyncore can raise exceptions.
asyncore uses either the select() or poll() system call. It is
possible for those system calls to fail, returning, for example,
EINTR. Python raises a select.error when an error occurs. If the
program using asyncore doesn't catch the exception, it will die with
an uncaught exception.
This module exports safer versions of loop() and poll() that wrap the
asyncore calls in try/except handlers that catch the errors and do the
right thing. In most cases, it is safe to catch the error and simply
retry the call.
XXX Operations on asyncore sockets can also fail with exceptions that
can safely be caught and ignored by user programs. It's not clear if
it would be useful to extend this module with wrappers for those
errors.
"""
# XXX The current implementation requires Python 2.0. Not sure if
# that's acceptable, depends on how many users want to combine ZEO 1.0
# and Zope 2.3.
import asyncore
import errno
import select
def loop(*args, **kwargs):
while 1:
try:
apply(asyncore.loop, args, kwargs)
except select.error, err:
if err[0] != errno.EINTR:
raise
else:
break
def poll(*args, **kwargs):
try:
apply(asyncore.poll, args, kwargs)
except select.error, err:
if err[0] != errno.EINTR:
raise
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""ZEO depends on recent versions of asyncore and cPickle
Try to fix up the imports of these to make these dependencies work,
localizing the hacks^H^H^H^H^Hchanges here.
"""
import sys, os
def whiff(where):
if not where: return 0
import imp
try: m=imp.find_module('ZServer', [where])
except: return 0
else: return 1
def fap():
# if we are using an old version of Python, our asyncore is likely to
# be out of date. If ZServer is sitting around, we can get a current
# version of ayncore from it. In any case, if we are going to be used
# with Zope, it's important to use the version from Zope.
try:
import ZServer
except:
# Try a little harder to import ZServer
import os, imp
location = package_home()
location = os.path.split(location)[0]
location = os.path.split(location)[0]
location = os.path.split(location)[0]
if whiff(location):
sys.path.append(location)
try:
import ZServer
except:
pass
import asyncore
if sys.version[:1] < '2' and asyncore.loop.func_code.co_argcount < 3:
raise ImportError, 'Cannot import an up-to-date asyncore'
sys.modules['ZEO.asyncore']=asyncore
# We need a recent version of cPickle too.
if sys.version[:3] < '1.6':
try:
from ZODB import cPickle
sys.modules['ZEO.cPickle']=cPickle
except:
# Try a little harder
import cPickle
else:
import cPickle
import cStringIO
p=cPickle.Pickler(cStringIO.StringIO(),1)
try:
p.fast=1
except:
raise ImportError, 'Cannot import an up-to-date cPickle'
p=cPickle.Unpickler(cStringIO.StringIO())
try:
p.find_global=1
except:
raise ImportError, 'Cannot import an up-to-date cPickle'
def package_home():
m=sys.modules[__name__]
if hasattr(m,'__path__'):
r=m.__path__[0]
elif "." in __name__:
from string import rfind
r=sys.modules[__name__[:rfind(__name__,'.')]].__path__[0]
else:
r=__name__
return os.path.join(os.getcwd(), r)
fap()
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""Simple rpc mechanisms
"""
__version__ = "$Revision: 1.23 $"[11:-2]
from cPickle import loads
import cPickle
from thread import allocate_lock
from smac import SizedMessageAsyncConnection
import socket, string, struct, asyncore, sys, time, select
from zLOG import LOG, TRACE, DEBUG, INFO
from ZEO import asyncwrap
from errno import EINTR
TupleType=type(())
# We create a special fast pickler! This allows us
# to create slightly more efficient pickles and
# to create them a tad faster.
pickler=cPickle.Pickler()
pickler.fast=1 # Don't use the memo
dump=pickler.dump
class UnUnPickleableError(Exception):
"Couldn't unpickle a remote exception"
class asyncRPC(SizedMessageAsyncConnection):
# Flag indicating whether a main loop is running. If one isn't running,
# then we'll have to provide our own main loop at times.
__haveMainLoop=0
def __Wakeup(*args): pass
def __init__(self, connection, outOfBand=None, tmin=5, tmax=300, debug=0):
self._connection=connection
self._outOfBand=outOfBand
self._tmin, self._tmax = tmin, tmax
self._debug=debug
self.__closed = 0
l=allocate_lock() # Response lock used to wait for call results
self.__la=l.acquire
self.__lr=l.release
self.__r=None
l.acquire()
l=allocate_lock() # Response lock used to wait for call results
self.__call_la=l.acquire
self.__call_lr=l.release
def connect(self, tryonce=1):
t=self._tmin
connection = self._connection
debug=self._debug
while self.__closed == 0:
LOG("client", INFO,
'Trying to connect to server: %s' % `connection`)
try:
if type(connection) is type(''):
s=socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
else:
s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(connection)
except Exception, err:
if debug is not None:
debug.blather("Failed to connect to server: %s" % err)
if tryonce: return 0
time.sleep(t)
t=t*2
if t > self._tmax: t=self._tmax
else:
if debug is not None:
debug.blather("Connected to server")
# Make sure the result lock is set, se we don't
# get an old result (e.g. the exception that
# we generated on close).
self.__r=None
self.__la(0)
self.aq_parent.notifyConnected(s)
return 1
def finishConnect(self, s):
if self.__haveMainLoop:
map = None # use the main loop map
else:
map = {} # provide a dummy map
SizedMessageAsyncConnection.__init__(self, s, '', map)
# we are our own socket map!
def keys(self): return (self._fileno,)
def values(self): return (self,)
def items(self): return ((self._fileno,self),)
def __len__(self): return 1
def __getitem__(self, key):
if key==self._fileno: return self
raise KeyError, key
def sync(self):
if self.__haveMainLoop: return # in async mode
# Ick, I have to do my own select loop, which sucks
while 1:
try: r, w, e = select.select([self._fileno],[],[],0.0)
except select.error, v:
if v[0] != EINTR: raise
if r: asyncwrap.poll(0.0, self)
else: break
def readLoop(self):
la=self.__la
while not la(0):
asyncwrap.poll(60.0, self)
self.__lr()
def setLoop(self, map=None, Wakeup=lambda : None):
if map is None: self.__haveMainLoop=0
else:
self.add_channel(map) # asyncore registration
self.__haveMainLoop=1
self.__Wakeup=Wakeup
def __call__(self, *args):
self.__call_la()
try:
self._last_args=args=dump(args,1)
self.message_output(args)
if self.__haveMainLoop:
self.__Wakeup() # Wakeup the main loop
else: self.readLoop()
while 1:
r=self._read()
c=r[:1]
if c=='R':
if r=='RN.': return None # Common case!
return loads(r[1:])
# If c == 'E', an error occured on the server. In
# this case, the return value is a pickled exception.
# Unpickle it and raise it on the client side. The
# traceback for this exception ends at this method,
# but the real error occurred somewhere in the server
# code. To diagnose the error, look for the real
# traceback in the server's zLOG output.
if c=='E':
try: r=loads(r[1:])
except:
raise UnUnPickleableError(r[1:])
if type(r) is TupleType:
raise r[0], r[1] # see server log for real traceback
raise r
oob=self._outOfBand
if oob is not None:
r=r[1:]
if r=='N.': r=None # Common case!
else: r=loads(r)
oob(c, r)
else:
raise UnrecognizedResult, r
finally:
self._last_args=''
self.__call_lr()
def sendMessage(self, *args):
self.message_output(dump(args,1))
if self.__haveMainLoop:
self.__Wakeup() # Wake up the main loop
else: asyncwrap.poll(0.0, self)
def setOutOfBand(self, f):
"""Define a call-back function for handling out-of-band communication
Normal communications from the server consists of call returns
and exception returns. The server may also send asynchronous
messages to the client. For the client to recieve these
messages, it must register an out-of-band callback
function. The function will be called with a single-character
message code and a message argument.
"""
self._outOfBand=f
def message_input(self, m):
if self._debug is not None:
if len(m) > 60:
md = repr(m[:60]) + ' ...'
else:
md = repr(m)
self._debug.trace('message_input %s' % md)
c=m[:1]
if c in 'RE':
self.__r=m
try: self.__lr()
except:
# Eek, this should never happen. We're messed up.
# we'd better close the connection.
self.close()
raise
else:
oob=self._outOfBand
if oob is not None:
m=m[1:]
if m=='N.': m=None
else: m=loads(m)
oob(c, m)
def _read(self):
self.__la()
return self.__r
def closeIntensionally(self):
if self.__haveMainLoop:
# We aren't willing to close until told to by the main loop.
# So we'll tell the main loop to tell us. :)
self.__Wakeup(lambda self=self: self.close())
else:
self.close()
self._outOfBand = None
self.__closed = 1
def close(self):
asyncRPC.inheritedAttribute('close')(self)
self.aq_parent.notifyDisconnected(self)
# causes read call to raise last exception, which should be
# the socket error that caused asyncore to close the socket.
self.__r = 'E' + dump(sys.exc_info()[:2], 1)
try:
self.__lr()
except:
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment