Commit 353aeeac authored by Jeremy Hylton's avatar Jeremy Hylton

Add timeout to try_connecting() method.

The timeout is necessary because in some cases a non-blocking connect
while neither succeed nor fail.  It is unacceptable to simply get
stuck forever in this code, because it is run at least once inside the
ClientStorage constructor.

Set the first timeout to a fairly low number -- 5 seconds -- so that a
ClientStorage(wait=0) call does not take a inordinate length of time.
Then set the default to a longer number -- 75 seconds, which is
typical timeout for a blocking connect() call.

This change fixes the hang in testZEOStorage from testStorageConfig.

XXX Need to get Guido to review the changes.
parent dc363d43
...@@ -283,26 +283,55 @@ class ConnectThread(threading.Thread): ...@@ -283,26 +283,55 @@ class ConnectThread(threading.Thread):
def run(self): def run(self):
delay = self.tmin delay = self.tmin
success = 0 success = 0
# Don't wait too long the first time.
# XXX make timeout configurable?
attempt_timeout = 5
while not self.stopped: while not self.stopped:
success = self.try_connecting() success = self.try_connecting(attempt_timeout)
if not self.one_attempt.isSet(): if not self.one_attempt.isSet():
self.one_attempt.set() self.one_attempt.set()
attempt_timeout = 75
if success > 0: if success > 0:
break break
time.sleep(delay) time.sleep(delay)
delay = min(delay*2, self.tmax) delay = min(delay*2, self.tmax)
log("CT: exiting thread: %s" % self.getName()) log("CT: exiting thread: %s" % self.getName())
def try_connecting(self): def try_connecting(self, timeout):
"""Try connecting to all self.addrlist addresses. """Try connecting to all self.addrlist addresses.
Return 1 if a preferred connection was found; 0 if no Return 1 if a preferred connection was found; 0 if no
connection was found; and -1 if a fallback connection was connection was found; and -1 if a fallback connection was
found. found.
"""
If no connection is found within timeout seconds, return 0.
"""
log("CT: attempting to connect on %d sockets" % len(self.addrlist)) log("CT: attempting to connect on %d sockets" % len(self.addrlist))
deadline = time.time() + timeout
wrappers = self._create_wrappers()
for wrap in wrappers.keys():
if wrap.state == "notified":
return 1
try:
if time.time() > deadline:
return 0
r = self._connect_wrappers(wrappers, deadline)
if r is not None:
return r
if time.time() > deadline:
return 0
r = self._fallback_wrappers(wrappers, deadline)
if r is not None:
return r
# Alas, no luck.
assert not wrappers
finally:
for wrap in wrappers.keys():
wrap.close()
del wrappers
return 0
def _create_wrappers(self):
# Create socket wrappers # Create socket wrappers
wrappers = {} # keys are active wrappers wrappers = {} # keys are active wrappers
for domain, addr in self.addrlist: for domain, addr in self.addrlist:
...@@ -311,12 +340,16 @@ class ConnectThread(threading.Thread): ...@@ -311,12 +340,16 @@ class ConnectThread(threading.Thread):
if wrap.state == "notified": if wrap.state == "notified":
for wrap in wrappers.keys(): for wrap in wrappers.keys():
wrap.close() wrap.close()
return 1 wrappers[wrap] = wrap
return wrappers
if wrap.state != "closed": if wrap.state != "closed":
wrappers[wrap] = wrap wrappers[wrap] = wrap
return wrappers
def _connect_wrappers(self, wrappers, deadline):
# Next wait until they all actually connect (or fail) # Next wait until they all actually connect (or fail)
# XXX If a sockets never connects, nor fails, we'd wait forever! # The deadline is necessary, because we'd wait forever if a
# sockets never connects or fails.
while wrappers: while wrappers:
if self.stopped: if self.stopped:
for wrap in wrappers.keys(): for wrap in wrappers.keys():
...@@ -328,8 +361,11 @@ class ConnectThread(threading.Thread): ...@@ -328,8 +361,11 @@ class ConnectThread(threading.Thread):
if wrap.state == "connecting"] if wrap.state == "connecting"]
if not connecting: if not connecting:
break break
if time.time() > deadline:
break
try: try:
r, w, x = select.select([], connecting, connecting, 1.0) r, w, x = select.select([], connecting, connecting, 1.0)
log("CT: select() %d, %d, %d" % tuple(map(len, (r,w,x))))
except select.error, msg: except select.error, msg:
log("CT: select failed; msg=%s" % str(msg), log("CT: select failed; msg=%s" % str(msg),
level=zLOG.WARNING) # XXX Is this the right level? level=zLOG.WARNING) # XXX Is this the right level?
...@@ -350,6 +386,7 @@ class ConnectThread(threading.Thread): ...@@ -350,6 +386,7 @@ class ConnectThread(threading.Thread):
if wrap.state == "closed": if wrap.state == "closed":
del wrappers[wrap] del wrappers[wrap]
def _fallback_wrappers(self, wrappers, deadline):
# If we've got wrappers left at this point, they're fallback # If we've got wrappers left at this point, they're fallback
# connections. Try notifying them until one succeeds. # connections. Try notifying them until one succeeds.
for wrap in wrappers.keys(): for wrap in wrappers.keys():
...@@ -366,9 +403,6 @@ class ConnectThread(threading.Thread): ...@@ -366,9 +403,6 @@ class ConnectThread(threading.Thread):
assert wrap.state == "closed" assert wrap.state == "closed"
del wrappers[wrap] del wrappers[wrap]
# Alas, no luck.
assert not wrappers
return 0
class ConnectWrapper: class ConnectWrapper:
"""An object that handles the connection procedure for one socket. """An object that handles the connection procedure for one socket.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment