Commit 6abb78ae authored by Jeremy Hylton's avatar Jeremy Hylton

Major refactoring of the rpc locking mechanisms.

Add a send_call() method that computes a new msgid and hands the
message off to the smac layer.  Uses __msgid_lock()

call() still uses __call_lock, but callAsync() does not.  callAsync()
does not use any lock beyond what send_call() does.
parent ad9fb39e
...@@ -137,8 +137,12 @@ class Connection(smac.SizedMessageAsyncConnection): ...@@ -137,8 +137,12 @@ class Connection(smac.SizedMessageAsyncConnection):
self.trigger = None self.trigger = None
self._prepare_async() self._prepare_async()
self._map = {self._fileno: self} self._map = {self._fileno: self}
# __msgid_lock guards access to msgid
self.__msgid_lock = threading.Lock()
# __call_lock prevents more than one synchronous call from
# being issued at one time.
self.__call_lock = threading.Lock() self.__call_lock = threading.Lock()
# The reply lock is used to block when a synchronous call is # __reply_lock is used to block when a synchronous call is
# waiting for a response # waiting for a response
self.__reply_lock = threading.Lock() self.__reply_lock = threading.Lock()
self.__reply_lock.acquire() self.__reply_lock.acquire()
...@@ -281,6 +285,20 @@ class Connection(smac.SizedMessageAsyncConnection): ...@@ -281,6 +285,20 @@ class Connection(smac.SizedMessageAsyncConnection):
# The next two public methods (call and callAsync) are used by # The next two public methods (call and callAsync) are used by
# clients to invoke methods on remote objects # clients to invoke methods on remote objects
def send_call(self, method, args, flags):
# send a message and return its msgid
self.__msgid_lock.acquire()
try:
msgid = self.msgid
self.msgid = self.msgid + 1
finally:
self.__msgid_lock.release()
if __debug__:
log("send msg: %d, %d, %s, ..." % (msgid, flags, method))
buf = self.marshal.encode(msgid, flags, method, args)
self.message_output(buf)
return msgid
def call(self, method, *args): def call(self, method, *args):
self.__call_lock.acquire() self.__call_lock.acquire()
try: try:
...@@ -291,14 +309,7 @@ class Connection(smac.SizedMessageAsyncConnection): ...@@ -291,14 +309,7 @@ class Connection(smac.SizedMessageAsyncConnection):
def _call(self, method, args): def _call(self, method, args):
if self.closed: if self.closed:
raise DisconnectedError() raise DisconnectedError()
msgid = self.msgid msgid = self.send_call(method, args, 0)
self.msgid = self.msgid + 1
if __debug__:
log("send msg: %d, 0, %s, ..." % (msgid, method))
self.message_output(self.marshal.encode(msgid, 0, method, args))
# XXX implementation of promises would start here
self.__reply = None self.__reply = None
self.wait() # will release reply lock before returning self.wait() # will release reply lock before returning
r_msgid, r_flags, r_args = self.__reply r_msgid, r_flags, r_args = self.__reply
...@@ -314,20 +325,9 @@ class Connection(smac.SizedMessageAsyncConnection): ...@@ -314,20 +325,9 @@ class Connection(smac.SizedMessageAsyncConnection):
return r_args return r_args
def callAsync(self, method, *args): def callAsync(self, method, *args):
self.__call_lock.acquire()
try:
self._callAsync(method, args)
finally:
self.__call_lock.release()
def _callAsync(self, method, args):
if self.closed: if self.closed:
raise DisconnectedError() raise DisconnectedError()
msgid = self.msgid self.send_call(method, args, ASYNC)
self.msgid += 1
if __debug__:
log("send msg: %d, %d, %s, ..." % (msgid, ASYNC, method))
self.message_output(self.marshal.encode(msgid, ASYNC, method, args))
self.poll() self.poll()
# handle IO, possibly in async mode # handle IO, possibly in async mode
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment