Commit 4859cd0a authored by Julien Muchembled's avatar Julien Muchembled

CMFActivity: more refactoring between SQLDict & SQLQueue

parent bb2bfa6e
...@@ -227,7 +227,9 @@ class Queue(object): ...@@ -227,7 +227,9 @@ class Queue(object):
return 0 return 0
def countMessageWithTag(self, activity_tool,value): def countMessageWithTag(self, activity_tool,value):
return 0 """Return the number of messages which match the given tag.
"""
return self.countMessage(activity_tool, tag=value)
# Transaction Management # Transaction Management
def prepareQueueMessageList(self, activity_tool, message_list): def prepareQueueMessageList(self, activity_tool, message_list):
......
...@@ -130,12 +130,50 @@ class SQLBase(Queue): ...@@ -130,12 +130,50 @@ class SQLBase(Queue):
processing=line.processing) processing=line.processing)
for line in result] for line in result]
def _getPriority(self, activity_tool, method, default): def countMessage(self, activity_tool, tag=None, path=None,
result = method() method_id=None, message_uid=None, **kw):
if not result: """Return the number of messages which match the given parameters.
return default """
if isinstance(tag, str):
tag = [tag]
if isinstance(path, str):
path = [path]
if isinstance(method_id, str):
method_id = [method_id]
result = activity_tool.SQLBase_validateMessageList(table=self.sql_table,
method_id=method_id,
path=path,
message_uid=message_uid,
tag=tag,
serialization_tag=None,
count=1)
return result[0].uid_count
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None,
active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLBase_hasMessage', None)
if hasMessage is not None:
if object is None:
path = None
else:
path = '/'.join(object.getPhysicalPath())
result = hasMessage(table=self.sql_table, path=path, method_id=method_id,
only_valid=only_valid, active_process_uid=active_process_uid)
if result:
return result[0].message_count > 0
return 0
def dumpMessageList(self, activity_tool):
# Dump all messages in the table.
return [Message.load(line.message, uid=line.uid, line=line)
for line in activity_tool.SQLBase_dumpMessageList(table=self.sql_table)]
def getPriority(self, activity_tool):
result = activity_tool.SQLBase_getPriority(table=self.sql_table)
if result:
assert len(result) == 1, len(result) assert len(result) == 1, len(result)
return result[0]['priority'] return result[0]['priority']
return Queue.getPriority(self, activity_tool)
def _retryOnLockError(self, method, args=(), kw={}): def _retryOnLockError(self, method, args=(), kw={}):
while True: while True:
...@@ -146,6 +184,36 @@ class SQLBase(Queue): ...@@ -146,6 +184,36 @@ class SQLBase(Queue):
# a lock error into a conflict error. # a lock error into a conflict error.
LOG('SQLBase', INFO, 'Got a lock error, retrying...') LOG('SQLBase', INFO, 'Got a lock error, retrying...')
# Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
serialization_tag=None):
if isinstance(method_id, str):
method_id = [method_id]
if isinstance(path, str):
path = [path]
if isinstance(tag, str):
tag = [tag]
if method_id or message_uid or path or tag or serialization_tag:
result = activity_tool.SQLBase_validateMessageList(table=self.sql_table,
method_id=method_id,
message_uid=message_uid,
path=path,
tag=tag,
count=False,
serialization_tag=serialization_tag)
message_list = []
for line in result:
m = Message.load(line.message,
line=line,
uid=line.uid,
date=line.date,
processing_node=line.processing_node)
if not hasattr(m, 'order_validation_text'): # BBB
m.order_validation_text = self.getOrderValidationText(m)
message_list.append(m)
return message_list
def _validate_after_method_id(self, activity_tool, message, value): def _validate_after_method_id(self, activity_tool, message, value):
return self._validate(activity_tool, method_id=value) return self._validate(activity_tool, method_id=value)
...@@ -565,3 +633,12 @@ class SQLBase(Queue): ...@@ -565,3 +633,12 @@ class SQLBase(Queue):
invoke(Message.load(line.message, uid=line.uid, line=line)) invoke(Message.load(line.message, uid=line.uid, line=line))
if uid_list: if uid_list:
activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list) activity_tool.SQLBase_delMessage(table=self.sql_table, uid=uid_list)
# Required for tests
def timeShift(self, activity_tool, delay, processing_node=None):
"""
To simulate time shift, we simply substract delay from
all dates in message(_queue) table
"""
activity_tool.SQLBase_timeShift(table=self.sql_table, delay=delay,
processing_node=processing_node)
...@@ -191,29 +191,6 @@ class SQLDict(SQLBase): ...@@ -191,29 +191,6 @@ class SQLDict(SQLBase):
return None, original_uid, [uid] return None, original_uid, [uid]
return load return load
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLDict_hasMessage', None)
if hasMessage is not None:
if object is None:
my_object_path = None
else:
my_object_path = '/'.join(object.getPhysicalPath())
result = hasMessage(path=my_object_path, method_id=method_id, only_valid=only_valid, active_process_uid=active_process_uid)
if len(result) > 0:
return result[0].message_count > 0
return 0
def dumpMessageList(self, activity_tool):
# Dump all messages in the table.
message_list = []
dumpMessageList = getattr(activity_tool, 'SQLDict_dumpMessageList', None)
if dumpMessageList is not None:
result = dumpMessageList()
for line in result:
m = Message.load(line.message, uid=line.uid, line=line)
message_list.append(m)
return message_list
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
offset = 0 offset = 0
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None) assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
...@@ -289,72 +266,4 @@ class SQLDict(SQLBase): ...@@ -289,72 +266,4 @@ class SQLDict(SQLBase):
return return
offset += READ_MESSAGE_LIMIT offset += READ_MESSAGE_LIMIT
# Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
serialization_tag=None):
if isinstance(method_id, str):
method_id = [method_id]
if isinstance(path, str):
path = [path]
if isinstance(tag, str):
tag = [tag]
if method_id or message_uid or path or tag or serialization_tag:
validateMessageList = activity_tool.SQLDict_validateMessageList
result = validateMessageList(method_id=method_id,
message_uid=message_uid,
path=path,
tag=tag,
count=False,
serialization_tag=serialization_tag)
message_list = []
for line in result:
m = Message.load(line.message,
line=line,
uid=line.uid,
date=line.date,
processing_node=line.processing_node)
if not hasattr(m, 'order_validation_text'): # BBB
m.order_validation_text = line.order_validation_text
message_list.append(m)
return message_list
else:
return []
def countMessage(self, activity_tool, tag=None, path=None,
method_id=None, message_uid=None, **kw):
"""Return the number of messages which match the given parameters.
"""
if isinstance(tag, str):
tag = [tag]
if isinstance(path, str):
path = [path]
if isinstance(method_id, str):
method_id = [method_id]
result = activity_tool.SQLDict_validateMessageList(method_id=method_id,
path=path,
message_uid=message_uid,
tag=tag,
serialization_tag=None,
count=1)
return result[0].uid_count
def countMessageWithTag(self, activity_tool, value):
"""Return the number of messages which match the given tag.
"""
return self.countMessage(activity_tool, tag=value)
# Required for tests (time shift)
def timeShift(self, activity_tool, delay, processing_node=None, retry=None):
"""
To simulate timeShift, we simply substract delay from
all dates in SQLDict message table
"""
activity_tool.SQLDict_timeShift(delay=delay, processing_node=processing_node,retry=retry)
def getPriority(self, activity_tool):
method = activity_tool.SQLDict_getPriority
default = SQLBase.getPriority(self, activity_tool)
return self._getPriority(activity_tool, method, default)
registerActivity(SQLDict) registerActivity(SQLDict)
...@@ -85,52 +85,6 @@ class SQLQueue(SQLBase): ...@@ -85,52 +85,6 @@ class SQLQueue(SQLBase):
processing_node_list=processing_node_list, processing_node_list=processing_node_list,
serialization_tag_list=serialization_tag_list) serialization_tag_list=serialization_tag_list)
def hasActivity(self, activity_tool, object, method_id=None, only_valid=None, active_process_uid=None):
hasMessage = getattr(activity_tool, 'SQLQueue_hasMessage', None)
if hasMessage is not None:
if object is None:
my_object_path = None
else:
my_object_path = '/'.join(object.getPhysicalPath())
result = hasMessage(path=my_object_path, method_id=method_id, only_valid=only_valid, active_process_uid=active_process_uid)
if len(result) > 0:
return result[0].message_count > 0
return 0
def countMessage(self, activity_tool, tag=None, path=None,
method_id=None, message_uid=None, **kw):
"""Return the number of messages which match the given parameters.
"""
if isinstance(tag, str):
tag = [tag]
if isinstance(path, str):
path = [path]
if isinstance(method_id, str):
method_id = [method_id]
result = activity_tool.SQLQueue_validateMessageList(method_id=method_id,
path=path,
message_uid=message_uid,
tag=tag,
serialization_tag=None,
count=1)
return result[0].uid_count
def countMessageWithTag(self, activity_tool, value):
"""Return the number of messages which match the given tag.
"""
return self.countMessage(activity_tool, tag=value)
def dumpMessageList(self, activity_tool):
# Dump all messages in the table.
message_list = []
dumpMessageList = getattr(activity_tool, 'SQLQueue_dumpMessageList', None)
if dumpMessageList is not None:
result = dumpMessageList()
for line in result:
m = Message.load(line.message, uid=line.uid, line=line)
message_list.append(m)
return message_list
def distribute(self, activity_tool, node_count): def distribute(self, activity_tool, node_count):
offset = 0 offset = 0
assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None) assignMessage = getattr(activity_tool, 'SQLBase_assignMessage', None)
...@@ -182,49 +136,4 @@ class SQLQueue(SQLBase): ...@@ -182,49 +136,4 @@ class SQLQueue(SQLBase):
return return
offset += READ_MESSAGE_LIMIT offset += READ_MESSAGE_LIMIT
# Validation private methods
def _validate(self, activity_tool, method_id=None, message_uid=None, path=None, tag=None,
serialization_tag=None):
if isinstance(method_id, str):
method_id = [method_id]
if isinstance(path, str):
path = [path]
if isinstance(tag, str):
tag = [tag]
if method_id or message_uid or path or tag or serialization_tag:
validateMessageList = activity_tool.SQLQueue_validateMessageList
result = validateMessageList(method_id=method_id,
message_uid=message_uid,
path=path,
tag=tag,
count=False,
serialization_tag=serialization_tag)
message_list = []
for line in result:
m = Message.load(line.message,
line=line,
uid=line.uid,
date=line.date,
processing_node=line.processing_node)
if not hasattr(m, 'order_validation_text'): # BBB
m.order_validation_text = self.getOrderValidationText(m)
message_list.append(m)
return message_list
else:
return []
# Required for tests (time shift)
def timeShift(self, activity_tool, delay, processing_node = None):
"""
To simulate timeShift, we simply substract delay from
all dates in SQLQueue message table
"""
activity_tool.SQLQueue_timeShift(delay=delay, processing_node=processing_node)
def getPriority(self, activity_tool):
method = activity_tool.SQLQueue_getPriority
default = SQLBase.getPriority(self, activity_tool)
return self._getPriority(activity_tool, method, default)
registerActivity(SQLQueue) registerActivity(SQLQueue)
...@@ -7,8 +7,9 @@ cache_time:0 ...@@ -7,8 +7,9 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params></params> <params>table
</params>
SELECT * FROM SELECT * FROM
message <dtml-var table>
ORDER BY ORDER BY
uid uid
...@@ -7,10 +7,10 @@ cache_time:0 ...@@ -7,10 +7,10 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params> <params>table
</params> </params>
SELECT `priority` FROM SELECT `priority` FROM
message <dtml-var table>
WHERE WHERE
processing_node = 0 processing_node = 0
AND date <= UTC_TIMESTAMP() AND date <= UTC_TIMESTAMP()
......
...@@ -7,12 +7,13 @@ cache_time:0 ...@@ -7,12 +7,13 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>path <params>table
path
method_id method_id
active_process_uid active_process_uid
only_valid</params> only_valid</params>
SELECT count(path) as message_count FROM SELECT count(path) as message_count FROM
message <dtml-var table>
WHERE 1 = 1 WHERE 1 = 1
<dtml-if expr="path is not None">AND path = <dtml-sqlvar path type="string"> </dtml-if> <dtml-if expr="path is not None">AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if expr="method_id is not None">AND method_id = <dtml-sqlvar method_id type="string"></dtml-if> <dtml-if expr="method_id is not None">AND method_id = <dtml-sqlvar method_id type="string"></dtml-if>
......
...@@ -7,15 +7,14 @@ cache_time:0 ...@@ -7,15 +7,14 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>delay <params>table
delay
processing_node</params> processing_node</params>
UPDATE UPDATE
message_queue <dtml-var table>
SET SET
date = DATE_SUB(date, INTERVAL <dtml-sqlvar delay type="int"> SECOND), date = DATE_SUB(date, INTERVAL <dtml-sqlvar delay type="int"> SECOND),
processing_date = DATE_SUB(processing_date, INTERVAL <dtml-sqlvar delay type="int"> SECOND) processing_date = DATE_SUB(processing_date, INTERVAL <dtml-sqlvar delay type="int"> SECOND)
WHERE
1 = 1
<dtml-if expr="processing_node is not None"> <dtml-if expr="processing_node is not None">
AND processing_node = <dtml-sqlvar processing_node type="int"> WHERE <dtml-sqltest processing_node type="int">
</dtml-if> </dtml-if>
...@@ -7,7 +7,8 @@ cache_time:0 ...@@ -7,7 +7,8 @@ cache_time:0
class_name: class_name:
class_file: class_file:
</dtml-comment> </dtml-comment>
<params>method_id <params>table
method_id
message_uid message_uid
path path
tag tag
...@@ -21,7 +22,7 @@ SELECT ...@@ -21,7 +22,7 @@ SELECT
* *
</dtml-if> </dtml-if>
FROM FROM
message <dtml-var table>
WHERE WHERE
processing_node > -10 processing_node > -10
<dtml-if expr="method_id is not None"> <dtml-if expr="method_id is not None">
......
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>delay
processing_node
retry</params>
UPDATE
message
SET
date = DATE_SUB(date, INTERVAL <dtml-sqlvar delay type="int"> SECOND),
processing_date = DATE_SUB(processing_date, INTERVAL <dtml-sqlvar delay type="int"> SECOND)
<dtml-if expr="retry is not None">
,retry = GREATEST(retry,<dtml-sqlvar retry type="int">) - <dtml-sqlvar retry type="int">
</dtml-if>
WHERE
1 = 1
<dtml-if expr="processing_node is not None">
AND processing_node = <dtml-sqlvar processing_node type="int">
</dtml-if>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params></params>
SELECT * FROM
message_queue
ORDER BY
uid
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:0
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>
</params>
SELECT `priority` FROM
message_queue
WHERE
processing_node = 0
AND date <= UTC_TIMESTAMP()
ORDER BY priority
LIMIT 1
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>path
method_id
active_process_uid
only_valid</params>
SELECT count(path) as message_count FROM
message_queue
WHERE 1 = 1
<dtml-if expr="path is not None">AND path = <dtml-sqlvar path type="string"> </dtml-if>
<dtml-if expr="method_id is not None"> AND method_id = <dtml-sqlvar method_id type="string"> </dtml-if>
<dtml-if expr="only_valid"> AND processing_node > -2 </dtml-if>
<dtml-if expr="active_process_uid is not None"> AND active_process_uid = <dtml-sqlvar active_process_uid type="int"> </dtml-if>
<dtml-comment>
title:
connection_id:cmf_activity_sql_connection
max_rows:1000
max_cache:0
cache_time:0
class_name:
class_file:
</dtml-comment>
<params>method_id
message_uid
path
tag
count
serialization_tag
</params>
SELECT
<dtml-if expr="count">
COUNT(*) AS uid_count
<dtml-else>
*
</dtml-if>
FROM
message_queue
WHERE
processing_node > -10
<dtml-if expr="method_id is not None">
AND method_id IN (
<dtml-in method_id><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if expr="message_uid is not None">AND uid = <dtml-sqlvar message_uid type="int"> </dtml-if>
<dtml-if expr="path is not None">
AND path IN (
<dtml-in path><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if expr="tag is not None">
AND tag IN (
<dtml-in tag><dtml-sqlvar sequence-item type="string"><dtml-if sequence-end><dtml-else>,</dtml-if></dtml-in>
)
</dtml-if>
<dtml-if expr="serialization_tag is not None">
AND processing_node > -1
AND serialization_tag = <dtml-sqlvar serialization_tag type="string">
</dtml-if>
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment