Commit e2ef1abd authored by Guido van Rossum's avatar Guido van Rossum

*Properly* protect self.thread (renamed from self._thread) with a

lock.  The lock is now never held for a long time, only while
self.thread is being assigned or inspected, and while a thread is
being created, started or stopped.  Waiting for an event or joining a
thread is now done without holding the lock.  There is no longer a
fear of AttributeError.

When a thread join doesn't succeed within 30 seconds, a message is
logged, and depending on the situation, the join is retried.
parent 0016a36e
......@@ -37,10 +37,10 @@ class ConnectionManager:
self.connected = 0
self.connection = None
self.closed = 0
# If _thread is not None, then there is a helper thread
# attempting to connect. _thread is protected by _connect_lock.
self._thread = None
self._connect_lock = threading.Lock()
# If thread is not None, then there is a helper thread
# attempting to connect. thread is protected by thread_lock.
self.thread = None
self.thread_lock = threading.Lock()
self.trigger = None
self.thr_async = 0
ThreadedAsync.register_loop_callback(self.set_async)
......@@ -82,15 +82,17 @@ class ConnectionManager:
def close(self):
"""Prevent ConnectionManager from opening new connections"""
self.closed = 1
self._connect_lock.acquire()
self.thread_lock.acquire()
try:
if self._thread is not None:
# XXX race on _thread
self._thread.stop()
self._thread.join(30)
assert not self._thread.isAlive()
t = self.thread
if t is not None:
t.stop()
finally:
self._connect_lock.release()
self.thread_lock.release()
if t is not None:
t.join(30)
if t.isAlive():
log("ConnectionManager.close(): self.thread.join() timed out")
if self.connection:
self.connection.close()
if self.trigger is not None:
......@@ -126,41 +128,45 @@ class ConnectionManager:
# XXX will a single attempt take too long?
self.connect()
self.thread_lock.acquire()
try:
event = self._thread.one_attempt
except AttributeError:
# An AttributeError means that (1) _thread is None and (2)
# as a consquence of (1) that the connect thread has
# already exited.
pass
else:
t = self.thread
finally:
self.thread_lock.release()
if t is not None:
event = t.one_attempt
event.wait()
return self.connected
def connect(self, sync=0):
if self.connected == 1:
return
self._connect_lock.acquire()
self.thread_lock.acquire()
try:
if self._thread is None:
t = self.thread
if t is None:
log("starting thread to connect to server")
self._thread = ConnectThread(self, self.client, self.addr,
self.tmin, self.tmax)
self._thread.start()
if sync:
try:
self._thread.join()
except AttributeError:
# probably means the thread exited quickly
pass
self.thread = t = ConnectThread(self, self.client, self.addr,
self.tmin, self.tmax)
t.start()
finally:
self._connect_lock.release()
self.thread_lock.release()
if sync:
t.join(30)
while t.isAlive():
log("ConnectionManager.connect(sync=1): "
"self.thread.join() timed out")
t.join(30)
def connect_done(self, c):
log("connect_done()")
self.connected = 1
self.connection = c
self._thread = None
self.thread_lock.acquire()
try:
self.thread = None
finally:
self.thread_lock.release()
def notify_closed(self):
self.connected = 0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment