Commit 8676c406 authored by David Wilson's avatar David Wilson

core: make _start_transmit / _stop_transmit async-only

For now at least, these APIs are always used in an asynchronous context,
so stop using the defer mechanism.
parent ee0f21d5
...@@ -1062,11 +1062,11 @@ Broker Class ...@@ -1062,11 +1062,11 @@ Broker Class
Mark the :py:attr:`receive_side <Stream.receive_side>` on `stream` as Mark the :py:attr:`receive_side <Stream.receive_side>` on `stream` as
not ready for reading. Safe to call from any thread. not ready for reading. Safe to call from any thread.
.. method:: start_transmit (stream) .. method:: _start_transmit (stream)
Mark the :py:attr:`transmit_side <Stream.transmit_side>` on `stream` as Mark the :py:attr:`transmit_side <Stream.transmit_side>` on `stream` as
ready for writing. Safe to call from any thread. When the associated ready for writing. Must only be called from the Broker thread. When the
file descriptor becomes ready for writing, associated file descriptor becomes ready for writing,
:py:meth:`BasicStream.on_transmit` will be called. :py:meth:`BasicStream.on_transmit` will be called.
.. method:: stop_receive (stream) .. method:: stop_receive (stream)
......
...@@ -122,8 +122,8 @@ send(msg)</y:MethodLabel> ...@@ -122,8 +122,8 @@ send(msg)</y:MethodLabel>
</y:NodeLabel> </y:NodeLabel>
<y:UML clipContent="true" constraint="" omitDetails="false" stereotype="" use3DEffect="true"> <y:UML clipContent="true" constraint="" omitDetails="false" stereotype="" use3DEffect="true">
<y:AttributeLabel/> <y:AttributeLabel/>
<y:MethodLabel>start_transmit(strm) <y:MethodLabel>_start_transmit(strm)
stop_transmit(strm)</y:MethodLabel> _stop_transmit(strm)</y:MethodLabel>
</y:UML> </y:UML>
</y:UMLClassNode> </y:UMLClassNode>
</data> </data>
......
...@@ -188,12 +188,12 @@ Stream Classes ...@@ -188,12 +188,12 @@ Stream Classes
.. method:: on_transmit (broker) .. method:: on_transmit (broker)
Called by :py:class:`Broker` when the stream's :py:attr:`transmit_side` Called by :py:class:`Broker` when the stream's :py:attr:`transmit_side`
has been marked writeable using :py:meth:`Broker.start_transmit` and has been marked writeable using :py:meth:`Broker._start_transmit` and
the broker has detected the associated file descriptor is ready for the broker has detected the associated file descriptor is ready for
writing. writing.
Subclasses must implement this method if Subclasses must implement this method if
:py:meth:`Broker.start_transmit` is ever called on them. :py:meth:`Broker._start_transmit` is ever called on them.
.. method:: on_shutdown (broker) .. method:: on_shutdown (broker)
......
...@@ -723,7 +723,7 @@ class BasicStream(object): ...@@ -723,7 +723,7 @@ class BasicStream(object):
def on_disconnect(self, broker): def on_disconnect(self, broker):
LOG.debug('%r.on_disconnect()', self) LOG.debug('%r.on_disconnect()', self)
broker.stop_receive(self) broker.stop_receive(self)
broker.stop_transmit(self) broker._stop_transmit(self)
if self.receive_side: if self.receive_side:
self.receive_side.close() self.receive_side.close()
if self.transmit_side: if self.transmit_side:
...@@ -834,7 +834,7 @@ class Stream(BasicStream): ...@@ -834,7 +834,7 @@ class Stream(BasicStream):
_vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written) _vv and IOLOG.debug('%r.on_transmit() -> len %d', self, written)
if not self._output_buf: if not self._output_buf:
broker.stop_transmit(self) broker._stop_transmit(self)
def _send(self, msg): def _send(self, msg):
_vv and IOLOG.debug('%r._send(%r)', self, msg) _vv and IOLOG.debug('%r._send(%r)', self, msg)
...@@ -842,7 +842,7 @@ class Stream(BasicStream): ...@@ -842,7 +842,7 @@ class Stream(BasicStream):
msg.auth_id, msg.handle, msg.reply_to or 0, msg.auth_id, msg.handle, msg.reply_to or 0,
len(msg.data)) + msg.data len(msg.data)) + msg.data
self._output_buf.append(pkt) self._output_buf.append(pkt)
self._router.broker.start_transmit(self) self._router.broker._start_transmit(self)
def send(self, msg): def send(self, msg):
"""Send `data` to `handle`, and tell the broker we have output. May """Send `data` to `handle`, and tell the broker we have output. May
...@@ -1317,14 +1317,14 @@ class Broker(object): ...@@ -1317,14 +1317,14 @@ class Broker(object):
IOLOG.debug('%r.stop_receive(%r)', self, stream) IOLOG.debug('%r.stop_receive(%r)', self, stream)
self.defer(self._list_discard, self._readers, stream.receive_side) self.defer(self._list_discard, self._readers, stream.receive_side)
def start_transmit(self, stream): def _start_transmit(self, stream):
IOLOG.debug('%r.start_transmit(%r)', self, stream) IOLOG.debug('%r._start_transmit(%r)', self, stream)
assert stream.transmit_side and stream.transmit_side.fd is not None assert stream.transmit_side and stream.transmit_side.fd is not None
self.defer(self._list_add, self._writers, stream.transmit_side) self._list_add(self._writers, stream.transmit_side)
def stop_transmit(self, stream): def _stop_transmit(self, stream):
IOLOG.debug('%r.stop_transmit(%r)', self, stream) IOLOG.debug('%r._stop_transmit(%r)', self, stream)
self.defer(self._list_discard, self._writers, stream.transmit_side) self._list_discard(self._writers, stream.transmit_side)
def _call(self, stream, func): def _call(self, stream, func):
try: try:
......
...@@ -62,14 +62,14 @@ class IoPump(mitogen.core.BasicStream): ...@@ -62,14 +62,14 @@ class IoPump(mitogen.core.BasicStream):
def write(self, s): def write(self, s):
self._output_buf += s self._output_buf += s
self._broker.start_transmit(self) self._broker._start_transmit(self)
def close(self): def close(self):
self._closed = True self._closed = True
# If local process hasn't exitted yet, ensure its write buffer is # If local process hasn't exitted yet, ensure its write buffer is
# drained before lazily triggering disconnect in on_transmit. # drained before lazily triggering disconnect in on_transmit.
if self.transmit_side.fd is not None: if self.transmit_side.fd is not None:
self._broker.start_transmit(self) self._broker._start_transmit(self)
def on_shutdown(self, broker): def on_shutdown(self, broker):
self.close() self.close()
...@@ -83,7 +83,7 @@ class IoPump(mitogen.core.BasicStream): ...@@ -83,7 +83,7 @@ class IoPump(mitogen.core.BasicStream):
self._output_buf = self._output_buf[written:] self._output_buf = self._output_buf[written:]
if not self._output_buf: if not self._output_buf:
broker.stop_transmit(self) broker._stop_transmit(self)
if self._closed: if self._closed:
self.on_disconnect(broker) self.on_disconnect(broker)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment