connector.py 6.12 KB
Newer Older
1 2
#
# Copyright (C) 2009  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 18 19

import socket
import errno
20 21 22 23 24

# Global connector registry.
# Fill by calling registerConnectorHandler.
# Read by calling getConnectorHandler.
connector_registry = {}
25
DEFAULT_CONNECTOR = 'SocketConnector'
26 27

def registerConnectorHandler(connector_handler):
Vincent Pelletier's avatar
Vincent Pelletier committed
28
    connector_registry[connector_handler.__name__] = connector_handler
29

30 31 32
def getConnectorHandler(connector=None):
    if connector is None:
        connector = DEFAULT_CONNECTOR
Vincent Pelletier's avatar
Vincent Pelletier committed
33 34 35 36 37 38 39
    if isinstance(connector, basestring):
        connector_handler = connector_registry.get(connector)
    else:
        # Allow to directly provide a handler class without requiring to
        # register it first.
        connector_handler = connector
    return connector_handler
40 41

class SocketConnector:
Vincent Pelletier's avatar
Vincent Pelletier committed
42 43 44 45 46 47 48 49 50 51 52 53
    """ This class is a wrapper for a socket """

    is_listening = False
    remote_addr = None
    is_closed = None

    def __init__(self, s=None, accepted_from=None):
        self.accepted_from = accepted_from
        if accepted_from is not None:
            self.remote_addr = accepted_from
            self.is_listening = False
            self.is_closed = False
54
        if s is None:
Vincent Pelletier's avatar
Vincent Pelletier committed
55 56 57
            self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        else:
            self.socket = s
58 59
        # always use non-blocking sockets
        self.socket.setblocking(0)
60 61
        # disable Nagle algorithm to reduce latency
        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
62

Vincent Pelletier's avatar
Vincent Pelletier committed
63 64 65 66
    def makeClientConnection(self, addr):
        self.is_closed = False
        self.remote_addr = addr
        try:
67 68 69 70 71 72 73 74
            self.socket.connect(addr)
        except socket.error, (err, errmsg):
            if err == errno.EINPROGRESS:
                raise ConnectorInProgressException
            if err == errno.ECONNREFUSED:
                raise ConnectorConnectionRefusedException
            raise ConnectorException, 'makeClientConnection to %s failed:' \
                ' %s:%s' % (addr, err, errmsg)
Vincent Pelletier's avatar
Vincent Pelletier committed
75 76 77 78 79 80 81 82 83 84

    def makeListeningConnection(self, addr):
        self.is_closed = False
        self.is_listening = True
        try:
            self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.socket.bind(addr)
            self.socket.listen(5)
        except socket.error, (err, errmsg):
            self.socket.close()
85 86
            raise ConnectorException, 'makeListeningConnection on %s failed:' \
                    ' %s:%s' % (addr, err, errmsg)
Vincent Pelletier's avatar
Vincent Pelletier committed
87 88 89 90 91

    def getError(self):
        return self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)

    def getDescriptor(self):
92 93
        # this descriptor must only be used by the event manager, where it
        # guarantee unicity only while the connector is opened and registered
94
        # in epoll
Vincent Pelletier's avatar
Vincent Pelletier committed
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
        return self.socket.fileno()

    def getNewConnection(self):
        try:
            new_s, addr =  self.socket.accept()
            new_s = SocketConnector(new_s, accepted_from=addr)
            return new_s, addr
        except socket.error, (err, errmsg):
            if err == errno.EAGAIN:
                raise ConnectorTryAgainException
            raise ConnectorException, 'getNewConnection failed: %s:%s' % \
                (err, errmsg)

    def shutdown(self):
        # This may fail if the socket is not connected.
        try:
            self.socket.shutdown(socket.SHUT_RDWR)
        except socket.error:
            pass

    def receive(self):
        try:
            return self.socket.recv(4096)
        except socket.error, (err, errmsg):
            if err == errno.EAGAIN:
                raise ConnectorTryAgainException
            if err == errno.ECONNREFUSED:
                raise ConnectorConnectionRefusedException
            if err == errno.ECONNRESET:
                raise ConnectorConnectionClosedException
            raise ConnectorException, 'receive failed: %s:%s' % (err, errmsg)

    def send(self, msg):
        try:
            return self.socket.send(msg)
        except socket.error, (err, errmsg):
            if err == errno.EAGAIN:
                raise ConnectorTryAgainException
133
            if err in (errno.ECONNRESET, errno.EPIPE):
Vincent Pelletier's avatar
Vincent Pelletier committed
134
                raise ConnectorConnectionClosedException
135
            raise ConnectorException, 'send failed: %s:%s' % (err, errmsg)
Vincent Pelletier's avatar
Vincent Pelletier committed
136 137 138 139 140 141 142 143

    def close(self):
        self.is_closed = True
        return self.socket.close()

    def __repr__(self):
        try:
            fileno = str(self.socket.fileno())
144
        except socket.error:
Vincent Pelletier's avatar
Vincent Pelletier committed
145 146 147 148 149
            fileno = '?'
        result = '<%s at 0x%x fileno %s %s>' % (self.__class__.__name__,
                 id(self), fileno, self.socket.getsockname())
        if self.is_closed is None:
            result += 'never opened'
150
        else:
Vincent Pelletier's avatar
Vincent Pelletier committed
151 152 153 154 155 156 157 158 159 160 161 162 163
            if self.is_closed:
                result += 'closed '
            else:
                result += 'opened '
            if self.is_listening:
                result += 'listening'
            else:
                if self.accepted_from is None:
                    result += 'to'
                else:
                    result += 'from'
                result += ' %s' % (self.remote_addr, )
        return result + '>'
164

165
registerConnectorHandler(SocketConnector)
166

167
class ConnectorException(Exception):
168 169
    pass

170
class ConnectorTryAgainException(ConnectorException):
171 172
    pass

173
class ConnectorInProgressException(ConnectorException):
174 175
    pass

176
class ConnectorConnectionClosedException(ConnectorException):
177 178
    pass

179
class ConnectorConnectionRefusedException(ConnectorException):
180
    pass
181