diff --git a/neo/client/app.py b/neo/client/app.py index 1051b89e76e06de9b5a1b8f81e9c6a7120de5dad..275265c96e542a497f52c401e72704a255c0c704 100644 --- a/neo/client/app.py +++ b/neo/client/app.py @@ -18,7 +18,7 @@ from thread import get_ident from cPickle import dumps from zlib import compress, decompress -from Queue import Queue, Empty +from neo.locking import Queue, Empty from random import shuffle from time import sleep diff --git a/neo/locking.py b/neo/locking.py index 39a02c8fa79a544c67658d57c6921b985cc6c5fc..6c91bfb13ed79163ec13f82940a40c747cb7ca30 100644 --- a/neo/locking.py +++ b/neo/locking.py @@ -1,6 +1,8 @@ from threading import Lock as threading_Lock from threading import RLock as threading_RLock from threading import currentThread +from Queue import Queue as Queue_Queue +from Queue import Empty """ Verbose locking classes. @@ -130,10 +132,46 @@ class VerboseLock(VerboseLockBase): return self.lock.locked() _locked = locked +class VerboseQueue(Queue_Queue): + def __init__(self, maxsize=0): + if maxsize <= 0: + self.put = self._verbose_put + Queue_Queue.__init__(self, maxsize=maxsize) + + def _verbose_note(self, fmt, *args): + sys.stderr.write(fmt % args + '\n') + sys.stderr.flush() + + def get(self, block=True, timeout=None): + note = self._verbose_note + me = '[%r]%s.get(block=%r, timeout=%r)' % (LockUser(), self, block, timeout) + note('%s waiting', me) + try: + result = Queue_Queue.get(self, block=block, timeout=timeout) + except Exception, exc: + note('%s got exeption %r', me, exc) + raise + note('%s got item', me) + return result + + def _verbose_put(self, item, block=True, timeout=None): + note = self._verbose_note + me = '[%r]%s.put(..., block=%r, timeout=%r)' % (LockUser(), self, block, timeout) + try: + Queue_Queue.put(self, item, block=block, timeout=timeout) + except Exception, exc: + note('%s got exeption %r', me, exc) + raise + note('%s put item', me) + + def __repr__(self): + return '<%s@%X>' % (self.__class__.__name__, id(self)) + if VERBOSE_LOCKING: Lock = VerboseLock RLock = VerboseRLock + Queue = VerboseQueue else: Lock = threading_Lock RLock = threading_RLock - + Queue = Queue_Queue