connection.py 19.3 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18
from neo import logging
19
from neo.locking import RLock
Yoshinori Okuji's avatar
Yoshinori Okuji committed
20

21
from neo import protocol
22
from neo.protocol import PacketMalformedError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23
from neo.event import IdleEvent
24
from neo.connector import ConnectorException, ConnectorTryAgainException, \
25 26
        ConnectorInProgressException, ConnectorConnectionRefusedException, \
        ConnectorConnectionClosedException
27
from neo.util import dump
Yoshinori Okuji's avatar
Yoshinori Okuji committed
28

29 30
def not_closed(func):
    def decorator(self, *args, **kw):
31
        if self.connector is None:
32
            raise ConnectorConnectionClosedException
33
        return func(self, *args, **kw)
34
    return decorator
35

36

37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
def lockCheckWrapper(func):
    """
    This function is to be used as a wrapper around
    MT(Client|Server)Connection class methods.

    It uses a "_" method on RLock class, so it might stop working without
    notice (sadly, RLock does not offer any "acquired" method, but that one
    will do as it checks that current thread holds this lock).

    It requires moniroted class to have an RLock instance in self._lock
    property.
    """
    def wrapper(self, *args, **kw):
        if not self._lock._is_owned():
            import traceback
            logging.warning('%s called on %s instance without being locked. Stack:\n%s', func.func_code.co_name, self.__class__.__name__, ''.join(traceback.format_stack()))
        # Call anyway
        return func(self, *args, **kw)
    return wrapper

57

Yoshinori Okuji's avatar
Yoshinori Okuji committed
58 59
class BaseConnection(object):
    """A base connection."""
60 61 62

    def __init__(self, event_manager, handler, connector = None,
                 addr = None, connector_handler = None):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
63
        self.em = event_manager
64
        self.connector = connector
Yoshinori Okuji's avatar
Yoshinori Okuji committed
65 66
        self.addr = addr
        self.handler = handler
67 68
        if connector is not None:
            self.connector_handler = connector.__class__
Yoshinori Okuji's avatar
Yoshinori Okuji committed
69
            event_manager.register(self)
70 71 72
        else:            
            self.connector_handler = connector_handler
            
Aurel's avatar
Aurel committed
73 74 75 76 77 78
    def lock(self):
        return 1

    def unlock(self):
        return None

79 80
    def getConnector(self):
        return self.connector
Yoshinori Okuji's avatar
Yoshinori Okuji committed
81

82 83 84 85 86
    def setConnector(self, connector):
        if self.connector is not None:
            raise RuntimeError, 'cannot overwrite a connector in a connection'
        if connector is not None:
            self.connector = connector
Yoshinori Okuji's avatar
Yoshinori Okuji committed
87
            self.em.register(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
88

Yoshinori Okuji's avatar
Yoshinori Okuji committed
89 90
    def getAddress(self):
        return self.addr
Yoshinori Okuji's avatar
Yoshinori Okuji committed
91

Yoshinori Okuji's avatar
Yoshinori Okuji committed
92 93 94 95 96 97
    def readable(self):
        raise NotImplementedError

    def writable(self):
        raise NotImplementedError

98 99 100 101 102 103 104 105 106 107 108 109 110
    def close(self):
        """Close the connection."""
        em = self.em
        if self.connector is not None:
            em.removeReader(self)
            em.removeWriter(self)
            em.unregister(self)            
            self.connector.shutdown()
            self.connector.close()
            self.connector = None

    __del__ = close

Yoshinori Okuji's avatar
Yoshinori Okuji committed
111 112 113
    def getHandler(self):
        return self.handler

114
    def setHandler(self, handler):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
115 116 117 118 119
        self.handler = handler

    def getEventManager(self):
        return self.em

120 121 122
    def getUUID(self):
        return None

123
    def isListening(self):
124 125
        return False

126
    def isServer(self):
127 128
        return False

129
    def isClient(self):
130
        return False
131

132 133 134
    def hasPendingMessages(self):
        return False

135

Yoshinori Okuji's avatar
Yoshinori Okuji committed
136 137
class ListeningConnection(BaseConnection):
    """A listen connection."""
138

139
    def __init__(self, event_manager, handler, addr, connector_handler, **kw):
140
        logging.debug('listening to %s:%d', *addr)
141 142 143 144 145 146
        BaseConnection.__init__(self, event_manager, handler,
                                addr = addr,
                                connector_handler = connector_handler)
        connector = self.connector_handler()
        connector.makeListeningConnection(addr)
        self.setConnector(connector)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
147 148 149 150
        self.em.addReader(self)

    def readable(self):
        try:
151
            new_s, addr = self.connector.getNewConnection()
152
            logging.debug('accepted a connection from %s:%d', *addr)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
153
            self.handler.connectionAccepted(self, new_s, addr)
154 155
        except ConnectorTryAgainException:
            pass
156

Grégory Wisniewski's avatar
Grégory Wisniewski committed
157 158 159
    def writable(self):
        return False

160
    def isListening(self):
161 162
        return True

163

Yoshinori Okuji's avatar
Yoshinori Okuji committed
164 165
class Connection(BaseConnection):
    """A connection."""
166

167 168 169
    def __init__(self, event_manager, handler,
                 connector = None, addr = None,
                 connector_handler = None):
170 171
        self.read_buf = ""
        self.write_buf = ""
Yoshinori Okuji's avatar
Yoshinori Okuji committed
172 173
        self.cur_id = 0
        self.event_dict = {}
Yoshinori Okuji's avatar
Yoshinori Okuji committed
174 175
        self.aborted = False
        self.uuid = None
176
        self._queue = []
177 178 179 180
        BaseConnection.__init__(self, event_manager, handler,
                                connector = connector, addr = addr,
                                connector_handler = connector_handler)
        if connector is not None:
181
            event_manager.addReader(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
182

Yoshinori Okuji's avatar
Yoshinori Okuji committed
183 184 185 186 187
    def getUUID(self):
        return self.uuid

    def setUUID(self, uuid):
        self.uuid = uuid
Yoshinori Okuji's avatar
Yoshinori Okuji committed
188

Grégory Wisniewski's avatar
Grégory Wisniewski committed
189
    def _getNextId(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
190
        next_id = self.cur_id
Yoshinori Okuji's avatar
Yoshinori Okuji committed
191 192
        # Deal with an overflow.
        if self.cur_id == 0xffffffff:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
193
            self.cur_id = 0
Yoshinori Okuji's avatar
Yoshinori Okuji committed
194 195
        else:
            self.cur_id += 1
Yoshinori Okuji's avatar
Yoshinori Okuji committed
196 197 198
        return next_id

    def close(self):
199 200 201 202
        logging.debug('closing a connector for %s (%s:%d)', 
                dump(self.uuid), *(self.addr))
        BaseConnection.close(self)
        for event in self.event_dict.itervalues():
203
            self.em.removeIdleEvent(event)
204
        self.event_dict.clear()
205 206
        self.write_buf = ""
        self.read_buf = ""
Yoshinori Okuji's avatar
Yoshinori Okuji committed
207 208 209

    def abort(self):
        """Abort dealing with this connection."""
210 211
        logging.debug('aborting a connector for %s (%s:%d)', 
                dump(self.uuid), *(self.addr))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
212 213 214 215
        self.aborted = True

    def writable(self):
        """Called when self is writable."""
Grégory Wisniewski's avatar
Grégory Wisniewski committed
216
        self._send()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
217 218 219 220
        if not self.pending():
            if self.aborted:
                self.close()
            else:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
221
                self.em.removeWriter(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
222 223 224

    def readable(self):
        """Called when self is readable."""
Grégory Wisniewski's avatar
Grégory Wisniewski committed
225
        self._recv()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
226 227 228
        self.analyse()

        if self.aborted:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
229
            self.em.removeReader(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
230 231 232

    def analyse(self):
        """Analyse received data."""
233
        while 1:
234
            packet = None
235
            try:
236
                packet = protocol.parse(self.read_buf)
237
            except PacketMalformedError, msg:
238
                self.handler.packetMalformed(self, packet, msg)
239
                return
Yoshinori Okuji's avatar
Yoshinori Okuji committed
240

241 242 243 244 245
            if packet is None:
                break

            # Remove idle events, if appropriate packets were received.
            for msg_id in (None, packet.getId()):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
246
                try:
247 248 249 250 251
                    event = self.event_dict[msg_id]
                    del self.event_dict[msg_id]
                    self.em.removeIdleEvent(event)
                except KeyError:
                    pass
Yoshinori Okuji's avatar
Yoshinori Okuji committed
252

253
            try:
254 255 256
                packet_type = packet.getType()
                if packet_type == protocol.PING:
                    # Send a pong notification
257
                    self.answer(protocol.pong(), packet.getId())
258 259 260 261
                elif packet_type != protocol.PONG:
                    # Skip PONG packets, its only purpose is to drop IdleEvent
                    # generated upong ping.
                    self._queue.append(packet)
262 263
            finally:
                self.read_buf = self.read_buf[len(packet):]
Yoshinori Okuji's avatar
Yoshinori Okuji committed
264

265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
    def hasPendingMessages(self):
        """
          Returns True if there are messages queued and awaiting processing.
        """
        return len(self._queue) != 0

    def _enqueue(self, packet):
        """
          Enqueue a parsed packet for future processing.
        """
        self._queue.append(packet)

    def _dequeue(self):
        """
          Dequeue a packet for processing.
        """
        return self._queue.pop(0)

    def process(self):
        """
          Process a pending packet.
        """
287
        packet = self._dequeue()
288
        self.logPacket('from', packet)
289
        self.handler.packetReceived(self, packet)
290

Yoshinori Okuji's avatar
Yoshinori Okuji committed
291
    def pending(self):
292
        return self.connector is not None and self.write_buf
Yoshinori Okuji's avatar
Yoshinori Okuji committed
293

Grégory Wisniewski's avatar
Grégory Wisniewski committed
294
    def _recv(self):
295
        """Receive data from a connector."""
Yoshinori Okuji's avatar
Yoshinori Okuji committed
296
        try:
297 298
            data = self.connector.receive()
            if not data:
299
                logging.debug('Connection %r closed in recv', self.connector)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
300
                self.close()
301
                self.handler.connectionClosed(self)
302 303
                return
            self.read_buf += data
304
        except ConnectorTryAgainException:        
305
            pass
306 307 308
        except ConnectorConnectionRefusedException:
            # should only occur while connecting
            self.close()
309
            self.handler.connectionFailed(self)
310 311 312
        except ConnectorConnectionClosedException:
            # connection resetted by peer, according to the man, this error 
            # should not occurs but it seems it's false
313
            logging.debug('Connection reset by peer: %r', self.connector)
314
            self.close()
315
            self.handler.connectionClosed(self)
316
        except ConnectorException:
317
            logging.debug('Unknown connection error: %r', self.connector)
318
            self.close()
319
            self.handler.connectionClosed(self)
320 321
            # unhandled connector exception
            raise
Yoshinori Okuji's avatar
Yoshinori Okuji committed
322

Grégory Wisniewski's avatar
Grégory Wisniewski committed
323
    def _send(self):
324
        """Send data to a connector."""
325 326 327 328 329
        if not self.write_buf:
            return
        try:
            n = self.connector.send(self.write_buf)
            if not n:
330
                logging.debug('Connection %r closed in send', self.connector)
331 332
                self.handler.connectionClosed(self)
                self.close()
333 334 335 336
                return
            self.write_buf = self.write_buf[n:]
        except ConnectorTryAgainException:
            pass
337 338
        except ConnectorConnectionClosedException:
            # connection resetted by peer
339
            logging.debug('Connection reset by peer: %r', self.connector)
340
            self.close()
341
            self.handler.connectionClosed(self)
342
        except ConnectorException:
343
            logging.debug('Unknown connection error: %r', self.connector)
344
            # unhandled connector exception
345
            self.close()
346
            self.handler.connectionClosed(self)
347
            raise 
Yoshinori Okuji's avatar
Yoshinori Okuji committed
348

349 350 351 352 353 354 355 356 357 358 359 360 361
    def logPacket(self, direction, packet):
        packet_type = packet.getType()
        ip, port = self.getAddress()
        if packet_type == protocol.ERROR:
            code, message = packet.decode()
            logging.debug('#0x%08x ERROR %-24s %s %s (%s:%d) %s',
                packet.getId(), code, direction, dump(self.uuid),
                ip, port, message)
        else:
            logging.debug('#0x%08x %-30s %s %s (%s:%d)', packet.getId(),
                packet_type, direction, dump(self.uuid),
                ip, port)

Grégory Wisniewski's avatar
Grégory Wisniewski committed
362
    def _addPacket(self, packet):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
363
        """Add a packet into the write buffer."""
364
        if self.connector is None:
365 366
            return

367
        self.logPacket(' to ', packet)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
368
        try:
369
            self.write_buf += packet.encode()
370
        except PacketMalformedError, m:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
371
            logging.critical('trying to send a too big message')
372 373 374
            # XXX: we should assert that the internalError packet has a size
            # lower than MAX_PACKET_SIZE
            return self.notify(protocol.internalError(m))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
375 376

        # If this is the first time, enable polling for writing.
377
        if self.write_buf:
378
            self.em.addWriter(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
379

Yoshinori Okuji's avatar
Yoshinori Okuji committed
380
    def expectMessage(self, msg_id = None, timeout = 5, additional_timeout = 30):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
381
        """Expect a message for a reply to a given message ID or any message.
Aurel's avatar
Aurel committed
382

Yoshinori Okuji's avatar
Yoshinori Okuji committed
383 384 385 386 387 388 389 390 391 392
        The purpose of this method is to define how much amount of time is
        acceptable to wait for a message, thus to detect a down or broken
        peer. This is important, because one error may halt a whole cluster
        otherwise. Although TCP defines a keep-alive feature, the timeout
        is too long generally, and it does not detect a certain type of reply,
        thus it is better to probe problems at the application level.

        The message ID specifies what ID is expected. Usually, this should
        be identical with an ID for a request message. If it is None, any
        message is acceptable, so it can be used to check idle time.
Aurel's avatar
Aurel committed
393

Yoshinori Okuji's avatar
Yoshinori Okuji committed
394 395
        The timeout is the amount of time to wait until keep-alive messages start.
        Once the timeout is expired, the connection starts to ping the peer.
Aurel's avatar
Aurel committed
396

Yoshinori Okuji's avatar
Yoshinori Okuji committed
397 398 399
        The additional timeout defines the amount of time after the timeout
        to invoke a timeoutExpired callback. If it is zero, no ping is sent, and
        the callback is executed immediately."""
400
        if self.connector is None:
401 402
            return

Yoshinori Okuji's avatar
Yoshinori Okuji committed
403 404
        event = IdleEvent(self, msg_id, timeout, additional_timeout)
        self.event_dict[msg_id] = event
Yoshinori Okuji's avatar
Yoshinori Okuji committed
405 406
        self.em.addIdleEvent(event)

407
    @not_closed
408
    def notify(self, packet):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
409
        """ Then a packet with a new ID """
410
        msg_id = self._getNextId()
411
        packet.setId(msg_id)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
412
        self._addPacket(packet)
413 414
        return msg_id

415
    @not_closed
416
    def ask(self, packet, timeout=5, additional_timeout=30):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
417 418
        """ Send a packet with a new ID and register the expectation of an answer """
        msg_id = self._getNextId()
419 420
        packet.setId(msg_id)
        self.expectMessage(msg_id)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
421
        self._addPacket(packet)
422 423
        return msg_id

424
    @not_closed
425
    def answer(self, packet, msg_id):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
426
        """ Answer to a packet by re-using its ID for the packet answer """
427
        packet.setId(msg_id)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
428
        self._addPacket(packet)
429

430 431 432 433
    def ping(self, timeout=5):
        """ Send a ping and expect to receive a pong notification """
        packet = protocol.ping()
        msg_id = self._getNextId()
Vincent Pelletier's avatar
Vincent Pelletier committed
434
        packet.setId(msg_id)
435 436 437
        self.expectMessage(msg_id, timeout, 0)
        self._addPacket(packet)

438

Yoshinori Okuji's avatar
Yoshinori Okuji committed
439 440
class ClientConnection(Connection):
    """A connection from this node to a remote node."""
441

442
    def __init__(self, event_manager, handler, addr, connector_handler, **kw):
443
        self.connecting = True
444 445
        Connection.__init__(self, event_manager, handler, addr = addr,
                            connector_handler = connector_handler)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
446
        handler.connectionStarted(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
447
        try:
448 449
            connector = self.connector_handler()
            self.setConnector(connector)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
450
            try:
451 452 453
                connector.makeClientConnection(addr)
            except ConnectorInProgressException:
                event_manager.addWriter(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
454
            else:
455
                self.connecting = False
Aurel's avatar
Aurel committed
456
                self.handler.connectionCompleted(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
457
                event_manager.addReader(self)
458 459 460
        except ConnectorConnectionRefusedException:
            handler.connectionFailed(self)
            self.close()
461
        except ConnectorException:
462
            # unhandled connector exception
Yoshinori Okuji's avatar
Yoshinori Okuji committed
463 464
            handler.connectionFailed(self)
            self.close()
465
            raise
Yoshinori Okuji's avatar
Yoshinori Okuji committed
466

Yoshinori Okuji's avatar
Yoshinori Okuji committed
467 468 469
    def writable(self):
        """Called when self is writable."""
        if self.connecting:
470
            err = self.connector.getError()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
471
            if err:
472
                self.handler.connectionFailed(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
473 474 475 476 477
                self.close()
                return
            else:
                self.connecting = False
                self.handler.connectionCompleted(self)
478
                self.em.addReader(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
479 480
        else:
            Connection.writable(self)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
481

482
    def isClient(self):
483 484
        return True

485

Yoshinori Okuji's avatar
Yoshinori Okuji committed
486 487
class ServerConnection(Connection):
    """A connection from a remote node to this node."""
488

489
    def isServer(self):
490
        return True
491

492

493 494
class MTClientConnection(ClientConnection):
    """A Multithread-safe version of ClientConnection."""
495

496
    def __init__(self, *args, **kwargs):
497 498
        # _lock is only here for lock debugging purposes. Do not use.
        self._lock = lock = RLock()
499 500
        self.acquire = lock.acquire
        self.release = lock.release
501
        self.dispatcher = kwargs.pop('dispatcher')
502
        self.lock()
503 504 505 506
        try:
            super(MTClientConnection, self).__init__(*args, **kwargs)
        finally:
            self.unlock()
507 508

    def lock(self, blocking = 1):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
509
        return self.acquire(blocking = blocking)
510 511 512 513

    def unlock(self):
        self.release()

514 515 516 517 518 519 520 521 522 523 524 525 526
    @lockCheckWrapper
    def writable(self, *args, **kw):
        return super(MTClientConnection, self).writable(*args, **kw)

    @lockCheckWrapper
    def readable(self, *args, **kw):
        return super(MTClientConnection, self).readable(*args, **kw)

    @lockCheckWrapper
    def analyse(self, *args, **kw):
        return super(MTClientConnection, self).analyse(*args, **kw)

    @lockCheckWrapper
Grégory Wisniewski's avatar
Grégory Wisniewski committed
527 528 529 530 531 532
    def expectMessage(self, *args, **kw):
        return super(MTClientConnection, self).expectMessage(*args, **kw)

    @lockCheckWrapper
    def notify(self, *args, **kw):
        return super(MTClientConnection, self).notify(*args, **kw)
533 534

    @lockCheckWrapper
535
    def ask(self, queue, packet, timeout=5, additional_timeout=30):
536 537
        msg_id = self._getNextId()
        packet.setId(msg_id)
538
        self.dispatcher.register(self, msg_id, queue)
539 540 541
        self.expectMessage(msg_id)
        self._addPacket(packet)
        return msg_id
542 543

    @lockCheckWrapper
Grégory Wisniewski's avatar
Grégory Wisniewski committed
544 545
    def answer(self, *args, **kw):
        return super(MTClientConnection, self).answer(*args, **kw)
546

547 548 549 550 551 552
    def close(self):
        self.lock()
        try:
            super(MTClientConnection, self).close()
        finally:
            self.release()
553

554

555 556
class MTServerConnection(ServerConnection):
    """A Multithread-safe version of ServerConnection."""
557

558
    def __init__(self, *args, **kwargs):
559 560
        # _lock is only here for lock debugging purposes. Do not use.
        self._lock = lock = RLock()
561 562
        self.acquire = lock.acquire
        self.release = lock.release
563
        self.lock()
564 565 566 567
        try:
            super(MTServerConnection, self).__init__(*args, **kwargs)
        finally:
            self.unlock()
568 569 570 571 572 573

    def lock(self, blocking = 1):
        return self.acquire(blocking = blocking)

    def unlock(self):
        self.release()
574

575 576 577 578 579 580 581 582 583 584 585 586 587
    @lockCheckWrapper
    def writable(self, *args, **kw):
        return super(MTServerConnection, self).writable(*args, **kw)

    @lockCheckWrapper
    def readable(self, *args, **kw):
        return super(MTServerConnection, self).readable(*args, **kw)

    @lockCheckWrapper
    def analyse(self, *args, **kw):
        return super(MTServerConnection, self).analyse(*args, **kw)

    @lockCheckWrapper
Grégory Wisniewski's avatar
Grégory Wisniewski committed
588 589 590 591 592
    def expectMessage(self, *args, **kw):
        return super(MTServerConnection, self).expectMessage(*args, **kw)

    @lockCheckWrapper
    def notify(self, *args, **kw):
593
        return super(MTServerConnection, self).notify(*args, **kw)
594 595

    @lockCheckWrapper
Grégory Wisniewski's avatar
Grégory Wisniewski committed
596
    def ask(self, *args, **kw):
597
        return super(MTServerConnection, self).ask(*args, **kw)
598 599

    @lockCheckWrapper
Grégory Wisniewski's avatar
Grégory Wisniewski committed
600
    def answer(self, *args, **kw):
601
        return super(MTServerConnection, self).answer(*args, **kw)
602