zrpc.py 8.33 KB
Newer Older
1
##############################################################################
2 3 4
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
Christopher Petrilli's avatar
Christopher Petrilli committed
5
# 
6 7 8 9 10 11
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL).  A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
12 13
# 
##############################################################################
Christopher Petrilli's avatar
Christopher Petrilli committed
14
"""Simple rpc mechanisms
Jim Fulton's avatar
Jim Fulton committed
15 16
"""

17
__version__ = "$Revision: 1.23 $"[11:-2]
Christopher Petrilli's avatar
Christopher Petrilli committed
18

19 20
from cPickle import loads
import cPickle
Jim Fulton's avatar
Jim Fulton committed
21
from thread import allocate_lock
22
from smac import SizedMessageAsyncConnection
23
import socket, string, struct, asyncore, sys, time, select
24
from zLOG import LOG, TRACE, DEBUG, INFO
Jeremy Hylton's avatar
Jeremy Hylton committed
25 26 27 28
from ZEO import asyncwrap

from errno import EINTR
TupleType=type(())
Jim Fulton's avatar
Jim Fulton committed
29

30 31 32 33 34 35
# We create a special fast pickler! This allows us
# to create slightly more efficient pickles and
# to create them a tad faster.
pickler=cPickle.Pickler()
pickler.fast=1 # Don't use the memo
dump=pickler.dump
Jim Fulton's avatar
Jim Fulton committed
36

37 38 39
class UnUnPickleableError(Exception):
    "Couldn't unpickle a remote exception"

40
class asyncRPC(SizedMessageAsyncConnection):
Jim Fulton's avatar
Jim Fulton committed
41

42 43 44
    # Flag indicating whether a main loop is running. If one isn't running,
    # then we'll have to provide our own main loop at times.
    __haveMainLoop=0  
45
    def __Wakeup(*args): pass
Jim Fulton's avatar
Jim Fulton committed
46

47 48
    def __init__(self, connection, outOfBand=None, tmin=5, tmax=300, debug=0):
        self._connection=connection
Jim Fulton's avatar
Jim Fulton committed
49
        self._outOfBand=outOfBand
50 51
        self._tmin, self._tmax = tmin, tmax
        self._debug=debug
52
        self.__closed = 0
Jim Fulton's avatar
Jim Fulton committed
53

54 55 56 57 58
        l=allocate_lock() # Response lock used to wait for call results
        self.__la=l.acquire
        self.__lr=l.release
        self.__r=None
        l.acquire()
59

60 61 62 63
        l=allocate_lock() # Response lock used to wait for call results
        self.__call_la=l.acquire
        self.__call_lr=l.release

64
    def connect(self, tryonce=1):
65 66 67
        t=self._tmin
        connection = self._connection
        debug=self._debug
68
        while self.__closed == 0:
69 70
            LOG("client", INFO,
                'Trying to connect to server: %s' % `connection`)
71 72 73 74 75 76
            try:
                if type(connection) is type(''):
                    s=socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
                else:
                    s=socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                s.connect(connection)    
77
            except Exception, err:
78 79
                if debug is not None:
                    debug.blather("Failed to connect to server: %s" % err)
80 81 82 83 84
                if tryonce: return 0
                time.sleep(t)
                t=t*2
                if t > self._tmax: t=self._tmax
            else:
85 86
                if debug is not None:
                    debug.blather("Connected to server")
87 88 89 90 91 92 93 94 95 96 97
                    
                # Make sure the result lock is set, se we don't
                # get an old result (e.g. the exception that
                # we generated on close).
                self.__r=None
                self.__la(0)
                
                self.aq_parent.notifyConnected(s)
                return 1

    def finishConnect(self, s):
98 99 100 101
        if self.__haveMainLoop:
            map = None # use the main loop map
        else:
            map = {} # provide a dummy map
102
        SizedMessageAsyncConnection.__init__(self, s, '', map)
103 104 105 106 107 108 109 110 111 112

    # we are our own socket map!
    def keys(self): return (self._fileno,)
    def values(self): return (self,)
    def items(self): return ((self._fileno,self),)
    def __len__(self): return 1
    def __getitem__(self, key):
        if key==self._fileno: return self
        raise KeyError, key

113
    def sync(self):
114
        if self.__haveMainLoop: return # in async mode
115 116 117 118 119 120

        # Ick, I have to do my own select loop, which sucks
        while 1:
            try: r, w, e = select.select([self._fileno],[],[],0.0)
            except select.error, v:
                if v[0] != EINTR: raise
Jeremy Hylton's avatar
Jeremy Hylton committed
121
            if r: asyncwrap.poll(0.0, self)
122 123
            else: break

124 125 126
    def readLoop(self):
        la=self.__la
        while not la(0):
Jeremy Hylton's avatar
Jeremy Hylton committed
127
            asyncwrap.poll(60.0, self)
128 129 130
        self.__lr()

    def setLoop(self, map=None, Wakeup=lambda : None):
131
        if map is None: self.__haveMainLoop=0
132 133
        else:
            self.add_channel(map) # asyncore registration
134
            self.__haveMainLoop=1
Jim Fulton's avatar
Jim Fulton committed
135

136
        self.__Wakeup=Wakeup
137

138
         
Jim Fulton's avatar
Jim Fulton committed
139
    def __call__(self, *args):
140 141 142 143 144
        self.__call_la()
        try:
            self._last_args=args=dump(args,1)
            self.message_output(args)

145 146
            if self.__haveMainLoop:
                self.__Wakeup() # Wakeup the main loop
147 148 149 150 151 152 153 154
            else: self.readLoop()

            while 1:
                r=self._read()
                c=r[:1]
                if c=='R':
                    if r=='RN.': return None # Common case!
                    return loads(r[1:])
155 156 157 158 159 160 161 162
                
                # If c == 'E', an error occured on the server.  In
                # this case, the return value is a pickled exception.
                # Unpickle it and raise it on the client side.  The
                # traceback for this exception ends at this method,
                # but the real error occurred somewhere in the server
                # code.  To diagnose the error, look for the real
                # traceback in the server's zLOG output.
163 164 165 166
                if c=='E':
                    try: r=loads(r[1:])
                    except:
                        raise UnUnPickleableError(r[1:])
167 168 169
                    if type(r) is TupleType:
                        raise r[0], r[1] # see server log for real traceback
                    raise r 
170 171 172 173 174 175 176 177 178 179 180
                oob=self._outOfBand
                if oob is not None:
                    r=r[1:]
                    if r=='N.': r=None # Common case!
                    else: r=loads(r)
                    oob(c, r)
                else:
                    raise UnrecognizedResult, r
        finally:
            self._last_args=''
            self.__call_lr()
Jim Fulton's avatar
Jim Fulton committed
181

182 183
    def sendMessage(self, *args):
        self.message_output(dump(args,1))
184 185
        if self.__haveMainLoop:
            self.__Wakeup() # Wake up the main loop
Jeremy Hylton's avatar
Jeremy Hylton committed
186
        else: asyncwrap.poll(0.0, self)
Jim Fulton's avatar
Jim Fulton committed
187

188 189
    def setOutOfBand(self, f):
        """Define a call-back function for handling out-of-band communication
Jim Fulton's avatar
Jim Fulton committed
190

191 192 193 194 195 196 197 198 199
        Normal communications from the server consists of call returns
        and exception returns. The server may also send asynchronous
        messages to the client. For the client to recieve these
        messages, it must register an out-of-band callback
        function. The function will be called with a single-character
        message code and a message argument.
        """

        self._outOfBand=f
Jim Fulton's avatar
Jim Fulton committed
200 201

    def message_input(self, m):
202
        if self._debug is not None:
203 204 205 206
            if len(m) > 60:
                md = repr(m[:60]) + ' ...'
            else:
                md = repr(m)
207
            self._debug.trace('message_input %s' % md)
Jim Fulton's avatar
Jim Fulton committed
208

Jim Fulton's avatar
Jim Fulton committed
209 210 211
        c=m[:1]
        if c in 'RE':
            self.__r=m
212 213 214 215 216 217
            try: self.__lr()
            except:
                # Eek, this should never happen. We're messed up.
                # we'd better close the connection.
                self.close()
                raise
Jim Fulton's avatar
Jim Fulton committed
218 219
        else:
            oob=self._outOfBand
220 221 222 223 224
            if oob is not None:
                m=m[1:]
                if m=='N.': m=None
                else: m=loads(m)
                oob(c, m)
Jim Fulton's avatar
Jim Fulton committed
225 226 227 228

    def _read(self):
        self.__la()
        return self.__r
229 230

    def closeIntensionally(self):
231 232 233 234
        if self.__haveMainLoop:
            # We aren't willing to close until told to by the main loop.
            # So we'll tell the main loop to tell us. :)
            self.__Wakeup(lambda self=self: self.close()) 
235 236
        else:
            self.close()
237
            self._outOfBand = None
238
            self.__closed = 1
Jim Fulton's avatar
Jim Fulton committed
239
        
240 241 242
    def close(self):
        asyncRPC.inheritedAttribute('close')(self)
        self.aq_parent.notifyDisconnected(self)
243 244
        # causes read call to raise last exception, which should be
        # the socket error that caused asyncore to close the socket.
245 246 247 248 249
        self.__r = 'E' + dump(sys.exc_info()[:2], 1)
        try:
            self.__lr()
        except:
            pass