Commit a4def624 authored by Jérome Perrin's avatar Jérome Perrin Committed by Arnaud Fontaine

CMFActivity py3

parent af9d93d5
...@@ -141,10 +141,10 @@ def sqltest_dict(): ...@@ -141,10 +141,10 @@ def sqltest_dict():
if value is None: # XXX: see comment in SQLBase._getMessageList if value is None: # XXX: see comment in SQLBase._getMessageList
return column + b" IS NULL" return column + b" IS NULL"
for x in value: for x in value:
return b"%s IN (%s)" % (column, str2bytes(', '.join(map( return str2bytes("%s IN (%s)" % (column, ', '.join(map(
str if isinstance(x, _SQLTEST_NO_QUOTE_TYPE_SET) else str if isinstance(x, _SQLTEST_NO_QUOTE_TYPE_SET) else
render_datetime if isinstance(x, DateTime) else render_datetime if isinstance(x, DateTime) else
render_string, value)))) lambda v: bytes2str(render_string(v)), value))))
return b"0" return b"0"
sqltest_dict[name] = render sqltest_dict[name] = render
_('active_process_uid') _('active_process_uid')
...@@ -246,7 +246,7 @@ def getNow(db): ...@@ -246,7 +246,7 @@ def getNow(db):
Note that this value is not cached, and is not transactionnal on MySQL Note that this value is not cached, and is not transactionnal on MySQL
side. side.
""" """
return db.query("SELECT UTC_TIMESTAMP(6)", 0)[1][0][0] return db.query(b"SELECT UTC_TIMESTAMP(6)", 0)[1][0][0]
class SQLBase(Queue): class SQLBase(Queue):
""" """
...@@ -284,7 +284,7 @@ CREATE TABLE %s ( ...@@ -284,7 +284,7 @@ CREATE TABLE %s (
db = activity_tool.getSQLConnection() db = activity_tool.getSQLConnection()
create = self.createTableSQL() create = self.createTableSQL()
if clear: if clear:
db.query("DROP TABLE IF EXISTS " + self.sql_table) db.query(str2bytes("DROP TABLE IF EXISTS " + self.sql_table))
db.query(create) db.query(create)
else: else:
src = db.upgradeSchema(create, create_if_not_exists=1, src = db.upgradeSchema(create, create_if_not_exists=1,
...@@ -790,7 +790,7 @@ CREATE TABLE %s ( ...@@ -790,7 +790,7 @@ CREATE TABLE %s (
b" %s%s" b" %s%s"
b" ORDER BY priority, date" b" ORDER BY priority, date"
b" LIMIT %i" b" LIMIT %i"
b")" % args).format(*a, *k)) b")" % args).format(*a, **k))
result = Results(query( result = Results(query(
b"SELECT *" b"SELECT *"
b" FROM (%s) AS t" b" FROM (%s) AS t"
...@@ -834,8 +834,8 @@ CREATE TABLE %s ( ...@@ -834,8 +834,8 @@ CREATE TABLE %s (
""" """
Put messages back in given processing_node. Put messages back in given processing_node.
""" """
db.query("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % ( db.query(("UPDATE %s SET processing_node=%s WHERE uid IN (%s)\0COMMIT" % (
self.sql_table, state, ','.join(map(str, uid_list)))) self.sql_table, state, ','.join(map(str, uid_list)))).encode())
def getProcessableMessageLoader(self, db, processing_node): def getProcessableMessageLoader(self, db, processing_node):
# do not merge anything # do not merge anything
...@@ -1042,16 +1042,16 @@ CREATE TABLE %s ( ...@@ -1042,16 +1042,16 @@ CREATE TABLE %s (
return bool(message_list) return bool(message_list)
def deleteMessageList(self, db, uid_list): def deleteMessageList(self, db, uid_list):
db.query("DELETE FROM %s WHERE uid IN (%s)" % ( db.query(str2bytes("DELETE FROM %s WHERE uid IN (%s)" % (
self.sql_table, ','.join(map(str, uid_list)))) self.sql_table, ','.join(map(str, uid_list)))))
def reactivateMessageList(self, db, uid_list, delay, retry): def reactivateMessageList(self, db, uid_list, delay, retry):
db.query("UPDATE %s SET" db.query(str2bytes("UPDATE %s SET"
" date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)" " date = DATE_ADD(UTC_TIMESTAMP(6), INTERVAL %s SECOND)"
"%s WHERE uid IN (%s)" % ( "%s WHERE uid IN (%s)" % (
self.sql_table, delay, self.sql_table, delay,
", retry = retry + 1" if retry else "", ", retry = retry + 1" if retry else "",
",".join(map(str, uid_list)))) ",".join(map(str, uid_list)))))
def finalizeMessageExecution(self, activity_tool, message_list, def finalizeMessageExecution(self, activity_tool, message_list,
uid_to_duplicate_uid_list_dict=None): uid_to_duplicate_uid_list_dict=None):
...@@ -1208,8 +1208,8 @@ CREATE TABLE %s ( ...@@ -1208,8 +1208,8 @@ CREATE TABLE %s (
To simulate time shift, we simply substract delay from To simulate time shift, we simply substract delay from
all dates in message(_queue) table all dates in message(_queue) table
""" """
activity_tool.getSQLConnection().query("UPDATE %s SET" activity_tool.getSQLConnection().query(("UPDATE %s SET"
" date = DATE_SUB(date, INTERVAL %s SECOND)" " date = DATE_SUB(date, INTERVAL %s SECOND)"
% (self.sql_table, delay) % (self.sql_table, delay)
+ ('' if processing_node is None else + ('' if processing_node is None else
"WHERE processing_node=%s" % processing_node)) "WHERE processing_node=%s" % processing_node)).encode())
...@@ -142,10 +142,10 @@ class SQLDict(SQLBase): ...@@ -142,10 +142,10 @@ class SQLDict(SQLBase):
if reserve_uid_list: if reserve_uid_list:
self.assignMessageList(db, processing_node, reserve_uid_list) self.assignMessageList(db, processing_node, reserve_uid_list)
else: else:
db.query("COMMIT") # XXX: useful ? db.query(b"COMMIT") # XXX: useful ?
except: except:
self._log(WARNING, 'Failed to reserve duplicates') self._log(WARNING, 'Failed to reserve duplicates')
db.query("ROLLBACK") db.query(b"ROLLBACK")
raise raise
if uid_list: if uid_list:
self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list) self._log(TRACE, 'Reserved duplicate messages: %r' % uid_list)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment