Commit 465f8a0e authored by Jeremy Hylton's avatar Jeremy Hylton

Merge ZEO2-branch to trunk. (Files added on branch.)

parent 93a59dd3
The Connection object should be extended to support more flexible
handling for outstanding calls. In particular, it should be possible
to have multiple calls with return values outstanding.
The mechanism described here is based on the promises mechanism in
Argus, which was influenced by futures in Multilisp.
Promises: Linguistic Support for Efficient Asynchronous Procedure
Calls in Distributed Systems. Barbara Liskov and Liuba Shrira.
Proc. of Conf. on Programming Language Design and Implementation
(PLDI), June 1988.
We want to support two different kinds of calls:
- send : invoke a method that returns no value
- call : invoke a method that returns a value
On the client, a call immediately returns a promise. A promise is an
object that can be used to claim the return value when it becomes
available.
- ready(): returns true if the return value is ready or an exception
occurred
- claim(): returns the call's return value or raises an exception,
blocking if necessary
The server side of a zrpc connection can be implemented using
asyncore. In that case, a method call blocks other RPC activity until
it returns. If a call needs to return a value, but can't return
immediately, it returns a delay object (ZEO.zrpc.server.Delay).
When the zrpc connection receives a Delay object, it does not
immediately return to the caller. Instead, it returns when the
reply() method is called. A Delay has two methods:
- set_sender()
- reply(obj): returns obj to the sender
-----------------------------------------
Open issues:
Delayed exception
There is currently no mechanism to raise an exception from a delayed
pcall.
Synchronization
The following item is part of Argus, but the motivation isn't entirely
clear.
For any two calls, C1 and C2, C1 always starts on the server
first. For the promises, C2 is ready() iff C1 is also ready().
The promises can be claimed in any order.
A related notion:
The connection should also support a synch() method that returns
only when all outstanding calls have completed. If any of these
calls raised an exception, the synch() call raises an exception.
XXX synch() sounds potentially useful, but it's not clear if it would
be useful for ZEO. In ZEO a single connection object handles multiple
threads, each thread is going to make independent calls. When a
particular tpc_begin() returns and a thread commits its transaction,
it makes more calls. These calls will before any of the other
tpc_begin() calls.
I think the Argus approach would be to use separate handlers for each
thread (not sure Argus had threads), so that a single thread could
rely on ordering guarantees.
Multithreaded server
There are lots of issues to work out here.
Delays may not be necessary if the connecftion handler runs in a
different thread than the object the handles the calls.
\ No newline at end of file
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# zrpc is a package with the following modules
# error -- exceptions raised by zrpc
# marshal -- internal, handles basic protocol issues
# connection -- object dispatcher
# client -- manages connection creation to remote server
# server -- manages incoming connections from remote clients
# trigger -- medusa's trigger
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import errno
import select
import socket
import sys
import threading
import time
import types
import ThreadedAsync
import zLOG
from ZEO.zrpc.log import log
from ZEO.zrpc.trigger import trigger
from ZEO.zrpc.connection import ManagedConnection
class ConnectionManager:
"""Keeps a connection up over time"""
def __init__(self, addr, client, tmin=1, tmax=180):
self.set_addr(addr)
self.client = client
self.tmin = tmin
self.tmax = tmax
self.connected = 0
self.connection = None
self.closed = 0
# If _thread is not None, then there is a helper thread
# attempting to connect. _thread is protected by _connect_lock.
self._thread = None
self._connect_lock = threading.Lock()
self.trigger = None
self.thr_async = 0
ThreadedAsync.register_loop_callback(self.set_async)
def __repr__(self):
return "<%s for %s>" % (self.__class__.__name__, self.addr)
def set_addr(self, addr):
"Set one or more addresses to use for server."
# For backwards compatibility (and simplicity?) the
# constructor accepts a single address in the addr argument --
# a string for a Unix domain socket or a 2-tuple with a
# hostname and port. It can also accept a list of such addresses.
addr_type = self._guess_type(addr)
if addr_type is not None:
self.addr = [(addr_type, addr)]
else:
self.addr = []
for a in addr:
addr_type = self._guess_type(a)
if addr_type is None:
raise ValueError, "unknown address in list: %s" % repr(a)
self.addr.append((addr_type, a))
def _guess_type(self, addr):
if isinstance(addr, types.StringType):
return socket.AF_UNIX
if (len(addr) == 2
and isinstance(addr[0], types.StringType)
and isinstance(addr[1], types.IntType)):
return socket.AF_INET
# not anything I know about
return None
def close(self):
"""Prevent ConnectionManager from opening new connections"""
self.closed = 1
self._connect_lock.acquire()
try:
if self._thread is not None:
# XXX race on _thread
self._thread.stop()
self._thread.join()
finally:
self._connect_lock.release()
if self.connection:
self.connection.close()
if self.trigger is not None:
self.trigger.close()
def set_async(self, map):
# This is the callback registered with ThreadedAsync. The
# callback might be called multiple times, so it shouldn't
# create a trigger every time and should never do anything
# after it's closed.
# It may be that the only case where it is called multiple
# times is in the test suite, where ThreadedAsync's loop can
# be started in a child process after a fork. Regardless,
# it's good to be defensive.
# XXX need each connection started with async==0 to have a
# callback
if not self.closed and self.trigger is None:
self.trigger = trigger()
self.thr_async = 1 # XXX needs to be set on the Connection
def attempt_connect(self):
"""Attempt a connection to the server without blocking too long.
There isn't a crisp definition for too long. When a
ClientStorage is created, it attempts to connect to the
server. If the server isn't immediately available, it can
operate from the cache. This method will start the background
connection thread and wait a little while to see if it
finishes quickly.
"""
# XXX will a single attempt take too long?
self.connect()
try:
event = self._thread.one_attempt
except AttributeError:
# An AttributeError means that (1) _thread is None and (2)
# as a consquence of (1) that the connect thread has
# already exited.
pass
else:
event.wait()
return self.connected
def connect(self, sync=0):
if self.connected == 1:
return
self._connect_lock.acquire()
try:
if self._thread is None:
log("starting thread to connect to server")
self._thread = ConnectThread(self, self.client, self.addr,
self.tmin, self.tmax)
self._thread.start()
if sync:
try:
self._thread.join()
except AttributeError:
# probably means the thread exited quickly
pass
finally:
self._connect_lock.release()
def connect_done(self, c):
log("connect_done()")
self.connected = 1
self.connection = c
self._thread = None
def notify_closed(self, conn):
self.connected = 0
self.connection = None
self.client.notifyDisconnected()
if not self.closed:
self.connect()
class Connected(Exception):
# helper for non-local exit
def __init__(self, sock):
self.sock = sock
# When trying to do a connect on a non-blocking socket, some outcomes
# are expected. Set _CONNECT_IN_PROGRESS to the errno value(s) expected
# when an initial connect can't complete immediately. Set _CONNECT_OK
# to the errno value(s) expected if the connect succeeds *or* if it's
# already connected (our code can attempt redundant connects).
if hasattr(errno, "WSAEWOULDBLOCK"): # Windows
_CONNECT_IN_PROGRESS = (errno.WSAEWOULDBLOCK,)
_CONNECT_OK = (0, errno.WSAEISCONN)
else: # Unix
_CONNECT_IN_PROGRESS = (errno.EINPROGRESS,)
_CONNECT_OK = (0, errno.EISCONN)
class ConnectThread(threading.Thread):
"""Thread that tries to connect to server given one or more addresses.
The thread is passed a ConnectionManager and the manager's client
as arguments. It calls notifyConnected() on the client when a
socket connects. If notifyConnected() returns without raising an
exception, the thread is done; it calls connect_done() on the
manager and exits.
The thread will continue to run, attempting connections, until a
successful notifyConnected() or stop() is called.
"""
__super_init = threading.Thread.__init__
# We don't expect clients to call any methods of this Thread other
# than close() and those defined by the Thread API.
def __init__(self, mgr, client, addrs, tmin, tmax):
self.__super_init(name="Connect(%s)" % addrs)
self.mgr = mgr
self.client = client
self.addrs = addrs
self.tmin = tmin
self.tmax = tmax
self.stopped = 0
self.one_attempt = threading.Event()
# A ConnectThread keeps track of whether it has finished a
# call to attempt_connects(). This allows the
# ConnectionManager to make an attempt to connect right away,
# but not block for too long if the server isn't immediately
# available.
def stop(self):
self.stopped = 1
# Every method from run() to the end is used internally by the Thread.
def run(self):
delay = self.tmin
while not self.stopped:
success = self.attempt_connects()
if not self.one_attempt.isSet():
self.one_attempt.set()
if success:
break
time.sleep(delay)
delay *= 2
if delay > self.tmax:
delay = self.tmax
log("thread exiting: %s" % self.getName())
def close_sockets(self):
for s in self.sockets.keys():
s.close()
def attempt_connects(self):
"""Try connecting to all self.addrs addresses.
If at least one succeeds, pick a success arbitrarily, close all other
successes (if any), and return true. If none succeed, return false.
"""
self.sockets = {} # {open socket: connection address}
log("attempting connection on %d sockets" % len(self.addrs))
try:
for domain, addr in self.addrs:
if __debug__:
log("attempt connection to %s" % repr(addr),
level=zLOG.DEBUG)
try:
s = socket.socket(domain, socket.SOCK_STREAM)
except socket.error, err:
log("Failed to create socket with domain=%s: %s" % (
domain, err), level=zLOG.ERROR)
continue
s.setblocking(0)
self.sockets[s] = addr
# connect() raises Connected iff it succeeds
# XXX can still block for a while if addr requires DNS
self.connect(s)
# next wait until they actually connect
while self.sockets:
if self.stopped:
self.close_sockets()
return 0
try:
sockets = self.sockets.keys()
r, w, x = select.select([], sockets, sockets, 1.0)
except select.error:
continue
for s in x:
del self.sockets[s]
s.close()
for s in w:
# connect() raises Connected iff it succeeds
self.connect(s)
except Connected, container:
s = container.sock
del self.sockets[s] # don't close the newly connected socket
self.close_sockets()
return 1
return 0
def connect(self, s):
"""Call s.connect_ex(addr); raise Connected iff connection succeeds.
We have to handle several possible return values from
connect_ex(). If the socket is connected and the initial ZEO
setup works, we're done. Report success by raising an
exception. Yes, the is odd, but we need to bail out of the
select() loop in the caller and an exception is a principled
way to do the abort.
If the socket sonnects and the initial ZEO setup
(notifyConnected()) fails or the connect_ex() returns an
error, we close the socket, remove it from self.sockets, and
proceed with the other sockets.
If connect_ex() returns EINPROGRESS, we need to try again later.
"""
addr = self.sockets[s]
try:
e = s.connect_ex(addr)
except socket.error, msg:
log("failed to connect to %s: %s" % (addr, msg),
level=zLOG.ERROR)
else:
log("connect_ex(%s) == %s" % (addr, e))
if e in _CONNECT_IN_PROGRESS:
return
elif e in _CONNECT_OK:
# special cases to deal with winsock oddities
if sys.platform.startswith("win") and e == 0:
# It appears that winsock isn't behaving as
# expected on Win2k. It's possible for connect()
# to return 0, but the connection to have failed.
# In particular, in situations where I expect to
# get a Connection refused (10061), I'm seeing
# connect_ex() return 0. OTOH, it looks like
# select() is a more reliable indicator on
# Windows.
r, w, x = select.select([s], [s], [s], 0.1)
if not (r or w or x):
return
if x:
# see comment at the end of the function
s.close()
del self.socket[s]
c = self.test_connection(s, addr)
if c:
log("connected to %s" % repr(addr), level=zLOG.DEBUG)
raise Connected(s)
else:
log("error connecting to %s: %s" % (addr, errno.errorcode[e]),
level=zLOG.DEBUG)
# Any execution that doesn't raise Connected() or return
# because of CONNECT_IN_PROGRESS is an error. Make sure the
# socket is closed and remove it from the dict of pending
# sockets.
s.close()
del self.sockets[s]
def test_connection(self, s, addr):
# Establish a connection at the zrpc level and call the
# client's notifyConnected(), giving the zrpc application a
# chance to do app-level check of whether the connection is
# okay.
c = ManagedConnection(s, addr, self.client, self.mgr)
try:
self.client.notifyConnected(c)
except:
log("error connecting to server: %s" % str(addr),
level=zLOG.ERROR, error=sys.exc_info())
c.close()
# Closing the ZRPC connection will eventually close the
# socket, somewhere in asyncore.
return 0
self.mgr.connect_done(c)
return 1
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import sys
import threading
import types
import ThreadedAsync
from ZEO import smac # XXX put smac in zrpc?
from ZEO.zrpc.error import ZRPCError, DisconnectedError, DecodingError
from ZEO.zrpc.log import log, short_repr
from ZEO.zrpc.marshal import Marshaller
from ZEO.zrpc.trigger import trigger
import zLOG
from ZODB import POSException
REPLY = ".reply" # message name used for replies
ASYNC = 1
class Delay:
"""Used to delay response to client for synchronous calls
When a synchronous call is made and the original handler returns
without handling the call, it returns a Delay object that prevents
the mainloop from sending a response.
"""
def set_sender(self, msgid, send_reply):
self.msgid = msgid
self.send_reply = send_reply
def reply(self, obj):
self.send_reply(self.msgid, obj)
class Connection(smac.SizedMessageAsyncConnection):
"""Dispatcher for RPC on object on both sides of socket.
The connection supports synchronous calls, which expect a return,
and asynchronous calls that do not.
It uses the Marshaller class to handle encoding and decoding of
method calls are arguments.
A Connection is designed for use in a multithreaded application,
where a synchronous call must block until a response is ready.
The current design only allows a single synchronous call to be
outstanding.
A socket connection between a client and a server allows either
side to invoke methods on the other side. The processes on each
end of the socket use a Connection object to manage communication.
"""
__super_init = smac.SizedMessageAsyncConnection.__init__
__super_close = smac.SizedMessageAsyncConnection.close
__super_writable = smac.SizedMessageAsyncConnection.writable
__super_message_output = smac.SizedMessageAsyncConnection.message_output
protocol_version = "Z200"
def __init__(self, sock, addr, obj=None):
self.obj = None
self.marshal = Marshaller()
self.closed = 0
self.msgid = 0
self.__super_init(sock, addr)
# A Connection either uses asyncore directly or relies on an
# asyncore mainloop running in a separate thread. If
# thr_async is true, then the mainloop is running in a
# separate thread. If thr_async is true, then the asyncore
# trigger (self.trigger) is used to notify that thread of
# activity on the current thread.
self.thr_async = 0
self.trigger = None
self._prepare_async()
self._map = {self._fileno: self}
self.__call_lock = threading.Lock()
# The reply lock is used to block when a synchronous call is
# waiting for a response
self.__reply_lock = threading.Lock()
self.__reply_lock.acquire()
self.register_object(obj)
self.handshake()
def __repr__(self):
return "<%s %s>" % (self.__class__.__name__, self.addr)
def close(self):
if self.closed:
return
self.closed = 1
self.close_trigger()
self.__super_close()
def close_trigger(self):
# overridden by ManagedConnection
if self.trigger is not None:
self.trigger.close()
def register_object(self, obj):
"""Register obj as the true object to invoke methods on"""
self.obj = obj
def handshake(self):
# When a connection is created the first message sent is a
# 4-byte protocol version. This mechanism should allow the
# protocol to evolve over time, and let servers handle clients
# using multiple versions of the protocol.
# The mechanism replace the message_input() method for the
# first message received.
# The client sends the protocol version it is using.
self._message_input = self.message_input
self.message_input = self.recv_handshake
self.message_output(self.protocol_version)
def recv_handshake(self, message):
if message == self.protocol_version:
self.message_input = self._message_input
# otherwise do something else...
def message_input(self, message):
"""Decoding an incoming message and dispatch it"""
# XXX Not sure what to do with errors that reach this level.
# Need to catch ZRPCErrors in handle_reply() and
# handle_request() so that they get back to the client.
try:
msgid, flags, name, args = self.marshal.decode(message)
except DecodingError, msg:
return self.return_error(None, None, DecodingError, msg)
if __debug__:
log("recv msg: %s, %s, %s, %s" % (msgid, flags, name,
short_repr(args)),
level=zLOG.DEBUG)
if name == REPLY:
self.handle_reply(msgid, flags, args)
else:
self.handle_request(msgid, flags, name, args)
def handle_reply(self, msgid, flags, args):
if __debug__:
log("recv reply: %s, %s, %s" % (msgid, flags, str(args)[:40]),
level=zLOG.DEBUG)
self.__reply = msgid, flags, args
self.__reply_lock.release() # will fail if lock is unlocked
def handle_request(self, msgid, flags, name, args):
if not self.check_method(name):
msg = "Invalid method name: %s on %s" % (name, repr(self.obj))
raise ZRPCError(msg)
if __debug__:
log("%s%s" % (name, args), level=zLOG.BLATHER)
meth = getattr(self.obj, name)
try:
ret = meth(*args)
except Exception, msg:
error = sys.exc_info()[:2]
log("%s() raised exception: %s" % (name, msg), zLOG.ERROR, error)
return self.return_error(msgid, flags, error[0], error[1])
if flags & ASYNC:
if ret is not None:
raise ZRPCError("async method %s returned value %s" %
(name, repr(ret)))
else:
if __debug__:
log("%s return %s" % (name, short_repr(ret)), zLOG.DEBUG)
if isinstance(ret, Delay):
ret.set_sender(msgid, self.send_reply)
else:
self.send_reply(msgid, ret)
def handle_error(self):
self.log_error()
self.close()
def log_error(self, msg="No error message supplied"):
log(msg, zLOG.ERROR, error=sys.exc_info())
def check_method(self, name):
# XXX Is this sufficient "security" for now?
if name.startswith('_'):
return None
return hasattr(self.obj, name)
def send_reply(self, msgid, ret):
msg = self.marshal.encode(msgid, 0, REPLY, ret)
self.message_output(msg)
def return_error(self, msgid, flags, err_type, err_value):
if flags is None:
self.log_error("Exception raised during decoding")
return
if flags & ASYNC:
self.log_error("Asynchronous call raised exception: %s" % self)
return
if type(err_value) is not types.InstanceType:
err_value = err_type, err_value
try:
msg = self.marshal.encode(msgid, 0, REPLY, (err_type, err_value))
except self.marshal.errors:
err = ZRPCError("Couldn't pickle error %s" % `err_value`)
msg = self.marshal.encode(msgid, 0, REPLY, (ZRPCError, err))
self.message_output(msg)
self._do_async_poll()
# The next two public methods (call and callAsync) are used by
# clients to invoke methods on remote objects
def call(self, method, *args):
self.__call_lock.acquire()
try:
return self._call(method, args)
finally:
self.__call_lock.release()
def _call(self, method, args):
if self.closed:
raise DisconnectedError("This action is temporarily unavailable")
msgid = self.msgid
self.msgid = self.msgid + 1
if __debug__:
log("send msg: %d, 0, %s, ..." % (msgid, method))
self.message_output(self.marshal.encode(msgid, 0, method, args))
# XXX implementation of promises starts here
self.__reply = None
# reply lock is currently held
self._do_async_loop()
# reply lock is held again...
r_msgid, r_flags, r_args = self.__reply
self.__reply_lock.acquire()
assert r_msgid == msgid, "%s != %s: %s" % (r_msgid, msgid, r_args)
if type(r_args) == types.TupleType \
and type(r_args[0]) == types.ClassType \
and issubclass(r_args[0], Exception):
raise r_args[1] # error raised by server
return r_args
def callAsync(self, method, *args):
self.__call_lock.acquire()
try:
self._callAsync(method, args)
finally:
self.__call_lock.release()
def _callAsync(self, method, args):
if self.closed:
raise DisconnectedError("This action is temporarily unavailable")
msgid = self.msgid
self.msgid += 1
if __debug__:
log("send msg: %d, %d, %s, ..." % (msgid, ASYNC, method))
self.message_output(self.marshal.encode(msgid, ASYNC, method, args))
# XXX The message won't go out right away in this case. It
# will wait for the asyncore loop to get control again. Seems
# okay to comment our for now, but need to understand better.
self._do_async_poll()
# handle IO, possibly in async mode
def _prepare_async(self):
self.thr_async = 0
ThreadedAsync.register_loop_callback(self.set_async)
# XXX If we are not in async mode, this will cause dead
# Connections to be leaked.
def set_async(self, map):
self.trigger = trigger()
self.thr_async = 1
def is_async(self):
if self.thr_async:
return 1
else:
return 0
def _do_async_loop(self):
"Invoke asyncore mainloop and wait for reply."
if __debug__:
log("_do_async_loop() async=%d" % self.is_async(),
level=zLOG.DEBUG)
if self.is_async():
self.trigger.pull_trigger()
self.__reply_lock.acquire()
# wait until reply...
else:
# Do loop only if lock is already acquired. XXX But can't
# we already guarantee that the lock is already acquired?
while not self.__reply_lock.acquire(0):
asyncore.poll(10.0, self._map)
if self.closed:
raise DisconnectedError()
self.__reply_lock.release()
def _do_async_poll(self, wait_for_reply=0):
"Invoke asyncore mainloop to get pending message out."
if __debug__:
log("_do_async_poll(), async=%d" % self.is_async(),
level=zLOG.DEBUG)
if self.is_async():
self.trigger.pull_trigger()
else:
asyncore.poll(0.0, self._map)
class ServerConnection(Connection):
"""Connection on the server side"""
# The server side does not send a protocol message. Instead, it
# adapts to whatever the client sends it.
class ManagedServerConnection(ServerConnection):
"""A connection that notifies its ConnectionManager of closing"""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
self.__mgr = mgr
self.__super_init(sock, addr, obj)
obj.notifyConnected(self)
def close(self):
self.__super_close()
self.__mgr.close(self)
class ManagedConnection(Connection):
"""A connection that notifies its ConnectionManager of closing.
A managed connection also defers the ThreadedAsync work to its
manager.
"""
__super_init = Connection.__init__
__super_close = Connection.close
def __init__(self, sock, addr, obj, mgr):
self.__mgr = mgr
self.__super_init(sock, addr, obj)
self.check_mgr_async()
def close_trigger(self):
# the manager should actually close the trigger
del self.trigger
def set_async(self, map):
pass
def _prepare_async(self):
# Don't do the register_loop_callback that the superclass does
pass
def check_mgr_async(self):
if not self.thr_async and self.__mgr.thr_async:
assert self.__mgr.trigger is not None, \
"manager (%s) has no trigger" % self.__mgr
self.thr_async = 1
self.trigger = self.__mgr.trigger
return 1
return 0
def is_async(self):
if self.thr_async:
return 1
return self.check_mgr_async()
def close(self):
self.__super_close()
self.__mgr.notify_closed(self)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
from ZODB import POSException
from ZEO.Exceptions import Disconnected
class ZRPCError(POSException.StorageError):
pass
class DecodingError(ZRPCError):
"""A ZRPC message could not be decoded."""
class DisconnectedError(ZRPCError, Disconnected):
"""The database storage is disconnected from the storage server."""
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import os
import types
import zLOG
_label = "zrpc:%s" % os.getpid()
def new_label():
global _label
_label = "zrpc:%s" % os.getpid()
def log(message, level=zLOG.BLATHER, label=None, error=None):
zLOG.LOG(label or _label, level, message, error=error)
REPR_LIMIT = 40
def short_repr(obj):
"Return an object repr limited to REPR_LIMIT bytes."
# Some of the objects being repr'd are large strings. It's wastes
# a lot of memory to repr them and then truncate, so special case
# them in this function.
# Also handle short repr of a tuple containing a long string.
if isinstance(obj, types.StringType):
obj = obj[:REPR_LIMIT]
elif isinstance(obj, types.TupleType):
elts = []
size = 0
for elt in obj:
r = repr(elt)
elts.append(r)
size += len(r)
if size > REPR_LIMIT:
break
obj = tuple(elts)
return repr(obj)[:REPR_LIMIT]
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import cPickle
from cStringIO import StringIO
import struct
import types
from ZEO.zrpc.error import ZRPCError
class Marshaller:
"""Marshal requests and replies to second across network"""
# It's okay to share a single Pickler as long as it's in fast
# mode, which means that it doesn't have a memo.
pickler = cPickle.Pickler()
pickler.fast = 1
pickle = pickler.dump
errors = (cPickle.UnpickleableError,
cPickle.UnpicklingError,
cPickle.PickleError,
cPickle.PicklingError)
VERSION = 1
def encode(self, msgid, flags, name, args):
"""Returns an encoded message"""
return self.pickle((msgid, flags, name, args), 1)
def decode(self, msg):
"""Decodes msg and returns its parts"""
unpickler = cPickle.Unpickler(StringIO(msg))
unpickler.find_global = find_global
try:
return unpickler.load() # msgid, flags, name, args
except (self.errors, IndexError), err_msg:
log("can't decode %s" % repr(msg), level=zLOG.ERROR)
raise DecodingError(msg)
_globals = globals()
_silly = ('__doc__',)
def find_global(module, name):
"""Helper for message unpickler"""
try:
m = __import__(module, _globals, _globals, _silly)
except ImportError, msg:
raise ZRPCError("import error %s: %s" % (module, msg))
try:
r = getattr(m, name)
except AttributeError:
raise ZRPCError("module %s has no global %s" % (module, name))
safe = getattr(r, '__no_side_effects__', 0)
if safe:
return r
# XXX what's a better way to do this? esp w/ 2.1 & 2.2
if type(r) == types.ClassType and issubclass(r, Exception):
return r
raise ZRPCError("Unsafe global: %s.%s" % (module, name))
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
import asyncore
import socket
import types
from ZEO.zrpc.connection import Connection, Delay
from ZEO.zrpc.log import log
# Export the main asyncore loop
loop = asyncore.loop
class Dispatcher(asyncore.dispatcher):
"""A server that accepts incoming RPC connections"""
__super_init = asyncore.dispatcher.__init__
reuse_addr = 1
def __init__(self, addr, factory=Connection, reuse_addr=None):
self.__super_init()
self.addr = addr
self.factory = factory
self.clients = []
if reuse_addr is not None:
self.reuse_addr = reuse_addr
self._open_socket()
def _open_socket(self):
if type(self.addr) == types.TupleType:
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
else:
self.create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.set_reuse_addr()
log("listening on %s" % str(self.addr))
self.bind(self.addr)
self.listen(5)
def writable(self):
return 0
def readable(self):
return 1
def handle_accept(self):
try:
sock, addr = self.accept()
except socket.error, msg:
log("accepted failed: %s" % msg)
return
c = self.factory(sock, addr)
log("connect from %s: %s" % (repr(addr), c))
self.clients.append(c)
##############################################################################
#
# Copyright (c) 2001, 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
# This module is a simplified version of the select_trigger module
# from Sam Rushing's Medusa server.
import asyncore
import os
import socket
import thread
if os.name == 'posix':
class trigger (asyncore.file_dispatcher):
"Wake up a call to select() running in the main thread"
# This is useful in a context where you are using Medusa's I/O
# subsystem to deliver data, but the data is generated by another
# thread. Normally, if Medusa is in the middle of a call to
# select(), new output data generated by another thread will have
# to sit until the call to select() either times out or returns.
# If the trigger is 'pulled' by another thread, it should immediately
# generate a READ event on the trigger object, which will force the
# select() invocation to return.
# A common use for this facility: letting Medusa manage I/O for a
# large number of connections; but routing each request through a
# thread chosen from a fixed-size thread pool. When a thread is
# acquired, a transaction is performed, but output data is
# accumulated into buffers that will be emptied more efficiently
# by Medusa. [picture a server that can process database queries
# rapidly, but doesn't want to tie up threads waiting to send data
# to low-bandwidth connections]
# The other major feature provided by this class is the ability to
# move work back into the main thread: if you call pull_trigger()
# with a thunk argument, when select() wakes up and receives the
# event it will call your thunk from within that thread. The main
# purpose of this is to remove the need to wrap thread locks around
# Medusa's data structures, which normally do not need them. [To see
# why this is true, imagine this scenario: A thread tries to push some
# new data onto a channel's outgoing data queue at the same time that
# the main thread is trying to remove some]
def __init__ (self):
r, w = os.pipe()
self.trigger = w
asyncore.file_dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
def close(self):
self.del_channel()
self.socket.close() # the read side of the pipe
os.close(self.trigger) # the write side of the pipe
def __repr__ (self):
return '<select-trigger (pipe) at %x>' % id(self)
def readable (self):
return 1
def writable (self):
return 0
def handle_connect (self):
pass
def pull_trigger (self, thunk=None):
# print 'PULL_TRIGGER: ', len(self.thunks)
if thunk:
try:
self.lock.acquire()
self.thunks.append (thunk)
finally:
self.lock.release()
os.write (self.trigger, 'x')
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.thunks = []
finally:
self.lock.release()
else:
# win32-safe version
class trigger (asyncore.dispatcher):
address = ('127.9.9.9', 19999)
def __init__ (self):
a = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
w = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
# set TCP_NODELAY to true to avoid buffering
w.setsockopt(socket.IPPROTO_TCP, 1, 1)
# tricky: get a pair of connected sockets
host='127.0.0.1'
port=19999
while 1:
try:
self.address=(host, port)
a.bind(self.address)
break
except:
if port <= 19950:
raise 'Bind Error', 'Cannot bind trigger!'
port=port - 1
a.listen (1)
w.setblocking (0)
try:
w.connect (self.address)
except:
pass
r, addr = a.accept()
a.close()
w.setblocking (1)
self.trigger = w
asyncore.dispatcher.__init__ (self, r)
self.lock = thread.allocate_lock()
self.thunks = []
self._trigger_connected = 0
def __repr__ (self):
return '<select-trigger (loopback) at %x>' % id(self)
def readable (self):
return 1
def writable (self):
return 0
def handle_connect (self):
pass
def pull_trigger (self, thunk=None):
if thunk:
try:
self.lock.acquire()
self.thunks.append (thunk)
finally:
self.lock.release()
self.trigger.send ('x')
def handle_read (self):
self.recv (8192)
try:
self.lock.acquire()
for thunk in self.thunks:
try:
thunk()
except:
(file, fun, line), t, v, tbinfo = asyncore.compact_traceback()
print 'exception in trigger thunk: (%s:%s %s)' % (t, v, tbinfo)
self.thunks = []
finally:
self.lock.release()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment