Commit a072087a authored by Jérome Perrin's avatar Jérome Perrin

CMFActivity py3

parent 6f9b0c85
......@@ -140,10 +140,10 @@ def sqltest_dict():
if value is None: # XXX: see comment in SQLBase._getMessageList
return column + b" IS NULL"
for x in value:
return b"%s IN (%s)" % (column, str2bytes(', '.join(map(
return str2bytes("%s IN (%s)" % (column, ', '.join(map(
str if isinstance(x, _SQLTEST_NO_QUOTE_TYPE_SET) else
render_datetime if isinstance(x, DateTime) else
render_string, value))))
lambda v: bytes2str(render_string(v)), value))))
return b"0"
sqltest_dict[name] = render
_('active_process_uid')
......@@ -245,7 +245,7 @@ def getNow(db):
Note that this value is not cached, and is not transactionnal on MySQL
side.
"""
return db.query("SELECT UTC_TIMESTAMP(6)", 0)[1][0][0]
return db.query(b"SELECT UTC_TIMESTAMP(6)", 0)[1][0][0]
class SQLBase(Queue):
"""
......@@ -283,7 +283,7 @@ CREATE TABLE %s (
db = activity_tool.getSQLConnection()
create = self.createTableSQL()
if clear:
db.query("DROP TABLE IF EXISTS " + self.sql_table)
db.query(str2bytes("DROP TABLE IF EXISTS " + self.sql_table))
db.query(create)
else:
src = db.upgradeSchema(create, create_if_not_exists=1,
......@@ -788,7 +788,7 @@ CREATE TABLE %s (
b" %s%s"
b" ORDER BY priority, date"
b" LIMIT %i"
b")" % args).format(*a, *k))
b")" % args).format(*a, **k))
result = Results(query(
b"SELECT *"
b" FROM (%s) AS t"
......@@ -832,8 +832,8 @@ CREATE TABLE %s (
"""
Put messages back in given processing_node.
"""
db.query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % (
self.sql_table, state, ','.join(map(str, uid_list))))
db.query(("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % (
self.sql_table, state, ','.join(map(str, uid_list)))).encode())
def getProcessableMessageLoader(self, db, processing_node):
# do not merge anything
......@@ -1040,16 +1040,16 @@ CREATE TABLE %s (
return not message_list
def deleteMessageList(self, db, uid_list):
db.query("DELETE FROM %s WHERE uid IN (%s)" % (
self.sql_table, ','.join(map(str, uid_list))))
db.query(str2bytes("DELETE FROM %s WHERE uid IN (%s)" % (
self.sql_table, ','.join(map(str, uid_list)))))
def reactivateMessageList(self, db, uid_list, delay, retry):
db.query("UPDATE %s SET"
db.query(str2bytes("UPDATE %s SET"
" date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)"
"%s WHERE uid IN (%s)" % (
self.sql_table, delay,
", retry = retry + 1" if retry else "",
",".join(map(str, uid_list))))
",".join(map(str, uid_list)))))
def finalizeMessageExecution(self, activity_tool, message_list,
uid_to_duplicate_uid_list_dict=None):
......@@ -1206,8 +1206,8 @@ CREATE TABLE %s (
To simulate time shift, we simply substract delay from
all dates in message(_queue) table
"""
activity_tool.getSQLConnection().query("UPDATE %s SET"
activity_tool.getSQLConnection().query(("UPDATE %s SET"
" date = DATE_SUB(date, INTERVAL %s SECOND)"
% (self.sql_table, delay)
+ ('' if processing_node is None else
"WHERE processing_node=%s" % processing_node))
"WHERE processing_node=%s" % processing_node)).encode())
......@@ -142,10 +142,10 @@ class SQLDict(SQLBase):
if reserve_uid_list:
self.assignMessageList(db, processing_node, reserve_uid_list)
else:
db.query("COMMIT") # XXX: useful ?
db.query(b"COMMIT") # XXX: useful ?
except:
self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK")
db.query(b"ROLLBACK")
raise
if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
......
......@@ -619,7 +619,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Monkey patch Queue to induce conflict errors artificially.
def query(self, query_string,*args, **kw):
# Not so nice, this is specific to zsql method
if "REPLACE INTO" in query_string:
if b"REPLACE INTO" in query_string:
raise OperationalError
return self.original_query(query_string,*args, **kw)
......@@ -1026,7 +1026,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
"""
activity_tool = self.getActivityTool()
def delete_volatiles():
for property_id in activity_tool.__dict__.keys():
for property_id in list(six.iterkeys(activity_tool.__dict__)):
if property_id.startswith('_v_'):
delattr(activity_tool, property_id)
organisation_module = self.getOrganisationModule()
......@@ -1142,6 +1142,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.flushAllActivities(silent=1, loop_size=100)
# Check there is a traceback in the email notification
sender, recipients, mail = message_list.pop()
mail = mail.decode()
self.assertIn("Module %s, line %s, in failingMethod" % (
__name__, inspect.getsourcelines(failingMethod)[1]), mail)
self.assertIn("ValueError:", mail)
......@@ -1237,7 +1238,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# Check that cmf_activity SQL connection still works
connection_da = self.portal.cmf_activity_sql_connection()
self.assertFalse(connection_da._registered)
connection_da.query('select 1')
connection_da.query(b'select 1')
self.assertTrue(connection_da._registered)
self.commit()
self.assertFalse(connection_da._registered)
......@@ -1693,7 +1694,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
# This is a one-shot method, revert after execution
SQLDict.dequeueMessage = original_dequeue
result = self.dequeueMessage(activity_tool, processing_node, node_family_id_set)
queue_tic_test_dict['isAlive'] = process_shutdown_thread.isAlive()
queue_tic_test_dict['is_alive'] = process_shutdown_thread.is_alive()
return result
SQLDict.dequeueMessage = dequeueMessage
Organisation.waitingActivity = waitingActivity
......@@ -1717,7 +1718,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.tic()
activity_thread = ActivityThread()
# Do not try to outlive main thread.
activity_thread.setDaemon(True)
activity_thread.daemon = True
# Call process_shutdown in yet another thread because it will wait for
# running activity to complete before returning, and we need to unlock
# activity *after* calling process_shutdown to make sure the next
......@@ -1727,7 +1728,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
activity_tool.process_shutdown(3, 0)
process_shutdown_thread = ProcessShutdownThread()
# Do not try to outlive main thread.
process_shutdown_thread.setDaemon(True)
process_shutdown_thread.daemon = True
activity_thread.start()
# Wait at rendez-vous for activity to arrive.
......@@ -1746,7 +1747,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(len(message_list), 1)
self.assertEqual(message_list[0].method_id, 'getTitle')
# Check that process_shutdown_thread was still runing when Queue_tic returned.
self.assertTrue(queue_tic_test_dict.get('isAlive'), repr(queue_tic_test_dict))
self.assertTrue(queue_tic_test_dict.get('is_alive'), repr(queue_tic_test_dict))
# Call tic in foreground. This must not lead to activity execution.
activity_tool.tic()
self.assertEqual(len(activity_tool.getMessageList()), 1)
......@@ -1894,7 +1895,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
"""
original_query = six.get_unbound_function(DB.query)
def query(self, query_string, *args, **kw):
if query_string.startswith('INSERT'):
if query_string.startswith(b'INSERT'):
insert_list.append(len(query_string))
if not n:
raise Skip
......@@ -2490,7 +2491,7 @@ class TestCMFActivity(ERP5TypeTestCase, LogInterceptor):
self.assertEqual(1, activity_tool.countMessage())
self.flushAllActivities()
sender, recipients, mail = message_list.pop()
self.assertIn('UID mismatch', mail)
self.assertIn(b'UID mismatch', mail)
m, = activity_tool.getMessageList()
self.assertEqual(m.processing_node, INVOKE_ERROR_STATE)
obj.flushActivity()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment