Commit 06e19b50 authored by Tres Seaver's avatar Tres Seaver

Ensure that mailhosts which share a queue directory do not double-deliver mails.

Do this by sharing the thread which processes emails for that directory.

Fixes LP #574286
parent c46eae11
...@@ -36,6 +36,10 @@ Features Added ...@@ -36,6 +36,10 @@ Features Added
Bugs Fixed Bugs Fixed
++++++++++ ++++++++++
- LP #574286: Ensure that mailhosts which share a queue directory do not
double-deliver mails, by sharing the thread which processes emails for
that directory.
- BaseRequest: Fixed handling of errors in 'traverseName'. - BaseRequest: Fixed handling of errors in 'traverseName'.
2.12.5 (2010-04-24) 2.12.5 (2010-04-24)
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
$Id$ $Id$
""" """
import logging import logging
from os.path import realpath
import re import re
from cStringIO import StringIO from cStringIO import StringIO
from copy import deepcopy from copy import deepcopy
...@@ -266,32 +267,38 @@ class MailBase(Implicit, Item, RoleManager): ...@@ -266,32 +267,38 @@ class MailBase(Implicit, Item, RoleManager):
force_tls=self.force_tls force_tls=self.force_tls
) )
security.declarePrivate('_getThreadKey')
def _getThreadKey(self):
""" Return the key used to find our processor thread.
"""
return realpath(self.smtp_queue_directory)
@synchronized(lock) @synchronized(lock)
def _stopQueueProcessorThread(self): def _stopQueueProcessorThread(self):
""" Stop thread for processing the mail queue """ """ Stop thread for processing the mail queue.
"""
path = self.absolute_url(1) key = self._getThreadKey()
if queue_threads.has_key(path): if queue_threads.has_key(key):
thread = queue_threads[path] thread = queue_threads[key]
thread.stop() thread.stop()
while thread.isAlive(): while thread.isAlive():
# wait until thread is really dead # wait until thread is really dead
time.sleep(0.3) time.sleep(0.3)
del queue_threads[path] del queue_threads[path]
LOG.info('Thread for %s stopped' % path) LOG.info('Thread for %s stopped' % key)
@synchronized(lock) @synchronized(lock)
def _startQueueProcessorThread(self): def _startQueueProcessorThread(self):
""" Start thread for processing the mail queue """ """ Start thread for processing the mail queue.
"""
path = self.absolute_url(1) key = self._getThreadKey()
if not queue_threads.has_key(path): if not queue_threads.has_key(key):
thread = QueueProcessorThread() thread = QueueProcessorThread()
thread.setMailer(self._makeMailer()) thread.setMailer(self._makeMailer())
thread.setQueuePath(self.smtp_queue_directory) thread.setQueuePath(self.smtp_queue_directory)
thread.start() thread.start()
queue_threads[path] = thread queue_threads[key] = thread
LOG.info('Thread for %s started' % path) LOG.info('Thread for %s started' % key)
security.declareProtected(view, 'queueLength') security.declareProtected(view, 'queueLength')
def queueLength(self): def queueLength(self):
...@@ -307,9 +314,9 @@ class MailBase(Implicit, Item, RoleManager): ...@@ -307,9 +314,9 @@ class MailBase(Implicit, Item, RoleManager):
security.declareProtected(view, 'queueThreadAlive') security.declareProtected(view, 'queueThreadAlive')
def queueThreadAlive(self): def queueThreadAlive(self):
""" return True/False is queue thread is working """ """ return True/False is queue thread is working
"""
th = queue_threads.get(self.absolute_url(1)) th = queue_threads.get(self._getThreadKey())
if th: if th:
return th.isAlive() return th.isAlive()
return False return False
......
...@@ -110,6 +110,14 @@ This is the message body.""" ...@@ -110,6 +110,14 @@ This is the message body."""
self.failUnlessEqual(resto, ['many@example.com']) self.failUnlessEqual(resto, ['many@example.com'])
self.failUnlessEqual(resfrom, 'me@example.com' ) self.failUnlessEqual(resfrom, 'me@example.com' )
def test__getThreadKey_uses_fspath(self):
mh1 = self._makeOne('mh1')
mh1.smtp_queue_directory = '/abc'
mh1.absolute_url = lambda self: 'http://example.com/mh1'
mh2 = self._makeOne('mh2')
mh2.smtp_queue_directory = '/abc'
mh2.absolute_url = lambda self: 'http://example.com/mh2'
self.assertEqual(mh1._getThreadKey(), mh2._getThreadKey())
def testAddressParser( self ): def testAddressParser( self ):
msg = """To: "Name, Nick" <recipient@domain.com>, "Foo Bar" <foo@domain.com> msg = """To: "Name, Nick" <recipient@domain.com>, "Foo Bar" <foo@domain.com>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment